{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"* [DataFrame API](http://nbviewer.ipython.org/github/jkthompson/pyspark-pictures/blob/master/pyspark-pictures-dataframes.ipynb)\n",
"* [GitHub](https://github.com/jkthompson/pyspark-pictures)\n",
"* [related blog post](http://data-frack.blogspot.com/2015/01/visual-mnemonics-for-pyspark-api.html)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Click on a picture to view pyspark docs"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"pyspark version:1.6.1\n",
"Ipython version:4.2.0\n"
]
}
],
"source": [
"import IPython\n",
"print(\"pyspark version:\" + str(sc.version))\n",
"print(\"Ipython version:\" + str(IPython.__version__))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 2, 3]\n",
"[(1, 1), (2, 4), (3, 9)]\n"
]
}
],
"source": [
"# map\n",
"x = sc.parallelize([1,2,3]) # sc = spark context, parallelize creates an RDD from the passed object\n",
"y = x.map(lambda x: (x,x**2))\n",
"print(x.collect()) # collect copies RDD elements to a list on the driver\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 2, 3]\n",
"[1, 100, 1, 2, 200, 4, 3, 300, 9]\n"
]
}
],
"source": [
"# flatMap\n",
"x = sc.parallelize([1,2,3])\n",
"y = x.flatMap(lambda x: (x, 100*x, x**2))\n",
"print(x.collect())\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[[1], [2, 3]]\n",
"[[1], [5]]\n"
]
}
],
"source": [
"# mapPartitions\n",
"x = sc.parallelize([1,2,3], 2)\n",
"def f(iterator): yield sum(iterator)\n",
"y = x.mapPartitions(f)\n",
"print(x.glom().collect()) # glom() flattens elements on the same partition\n",
"print(y.glom().collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[[1], [2, 3]]\n",
"[[(0, 1)], [(1, 5)]]\n"
]
}
],
"source": [
"# mapPartitionsWithIndex\n",
"x = sc.parallelize([1,2,3], 2)\n",
"def f(partitionIndex, iterator): yield (partitionIndex,sum(iterator))\n",
"y = x.mapPartitionsWithIndex(f)\n",
"print(x.glom().collect()) # glom() flattens elements on the same partition\n",
"print(y.glom().collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[[1], [2, 3]]\n",
"2\n"
]
}
],
"source": [
"# getNumPartitions\n",
"x = sc.parallelize([1,2,3], 2)\n",
"y = x.getNumPartitions()\n",
"print(x.glom().collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 2, 3]\n",
"[1, 3]\n"
]
}
],
"source": [
"# filter\n",
"x = sc.parallelize([1,2,3])\n",
"y = x.filter(lambda x: x%2 == 1) # filters out even elements\n",
"print(x.collect())\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['A', 'A', 'B']\n",
"['A', 'B']\n"
]
}
],
"source": [
"# distinct\n",
"x = sc.parallelize(['A','A','B'])\n",
"y = x.distinct()\n",
"print(x.collect())\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"x = [0, 1, 2, 3, 4, 5, 6]\n",
"sample:0 y = [0, 6]\n",
"sample:1 y = [4]\n",
"sample:2 y = [1, 2, 3]\n",
"sample:3 y = [2, 3, 5, 6]\n",
"sample:4 y = [1, 2]\n"
]
}
],
"source": [
"# sample\n",
"x = sc.parallelize(range(7))\n",
"ylist = [x.sample(withReplacement=False, fraction=0.5) for i in range(5)] # call 'sample' 5 times\n",
"print('x = ' + str(x.collect()))\n",
"for cnt,y in zip(range(len(ylist)), ylist):\n",
" print('sample:' + str(cnt) + ' y = ' + str(y.collect()))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"x = [0, 1, 2, 3, 4, 5, 6]\n",
"sample:0 y = [5, 4, 3]\n",
"sample:1 y = [4, 0, 2]\n",
"sample:2 y = [1, 2, 4]\n",
"sample:3 y = [5, 6, 0]\n",
"sample:4 y = [3, 1, 6]\n"
]
}
],
"source": [
"# takeSample\n",
"x = sc.parallelize(range(7))\n",
"ylist = [x.takeSample(withReplacement=False, num=3) for i in range(5)] # call 'sample' 5 times\n",
"print('x = ' + str(x.collect()))\n",
"for cnt,y in zip(range(len(ylist)), ylist):\n",
" print('sample:' + str(cnt) + ' y = ' + str(y)) # no collect on y"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['A', 'A', 'B']\n",
"['D', 'C', 'A']\n",
"['A', 'A', 'B', 'D', 'C', 'A']\n"
]
}
],
"source": [
"# union\n",
"x = sc.parallelize(['A','A','B'])\n",
"y = sc.parallelize(['D','C','A'])\n",
"z = x.union(y)\n",
"print(x.collect())\n",
"print(y.collect())\n",
"print(z.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['A', 'A', 'B']\n",
"['A', 'C', 'D']\n",
"['A']\n"
]
}
],
"source": [
"# intersection\n",
"x = sc.parallelize(['A','A','B'])\n",
"y = sc.parallelize(['A','C','D'])\n",
"z = x.intersection(y)\n",
"print(x.collect())\n",
"print(y.collect())\n",
"print(z.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('B', 1), ('A', 2), ('C', 3)]\n",
"[('A', 2), ('B', 1), ('C', 3)]\n"
]
}
],
"source": [
"# sortByKey\n",
"x = sc.parallelize([('B',1),('A',2),('C',3)])\n",
"y = x.sortByKey()\n",
"print(x.collect())\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['Apple', 'Bat', 'Cat']\n"
]
}
],
"source": [
"# sortBy\n",
"x = sc.parallelize(['Cat','Apple','Bat'])\n",
"def keyGen(val): return val[0]\n",
"y = x.sortBy(keyGen)\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['C', 'B', 'A']\n",
"[['C'], ['B', 'A']]\n"
]
}
],
"source": [
"# glom\n",
"x = sc.parallelize(['C','B','A'], 2)\n",
"y = x.glom()\n",
"print(x.collect()) \n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['A', 'B']\n",
"['C', 'D']\n",
"[('A', 'C'), ('A', 'D'), ('B', 'C'), ('B', 'D')]\n"
]
}
],
"source": [
"# cartesian\n",
"x = sc.parallelize(['A','B'])\n",
"y = sc.parallelize(['C','D'])\n",
"z = x.cartesian(y)\n",
"print(x.collect())\n",
"print(y.collect())\n",
"print(z.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
"<"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 2, 3]\n",
"[('A', [1, 3]), ('B', [2])]\n"
]
}
],
"source": [
"# groupBy\n",
"x = sc.parallelize([1,2,3])\n",
"y = x.groupBy(lambda x: 'A' if (x%2 == 1) else 'B' )\n",
"print(x.collect())\n",
"print([(j[0],[i for i in j[1]]) for j in y.collect()]) # y is nested, this iterates through it"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['A', 'Ba', 'C', 'AD']\n",
"[u'A', u'Ba', u'AD']\n"
]
}
],
"source": [
"# pipe\n",
"x = sc.parallelize(['A', 'Ba', 'C', 'AD'])\n",
"y = x.pipe('grep -i \"A\"') # calls out to grep, may fail under Windows \n",
"print(x.collect())\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 2, 3]\n",
"None\n",
"1\n",
"3\n",
"2\n",
"\n"
]
}
],
"source": [
"# foreach\n",
"from __future__ import print_function\n",
"x = sc.parallelize([1,2,3])\n",
"def f(el):\n",
" '''side effect: append the current RDD elements to a file'''\n",
" f1=open(\"./foreachExample.txt\", 'a+') \n",
" print(el,file=f1)\n",
"\n",
"open('./foreachExample.txt', 'w').close() # first clear the file contents\n",
"\n",
"y = x.foreach(f) # writes into foreachExample.txt\n",
"\n",
"print(x.collect())\n",
"print(y) # foreach returns 'None'\n",
"# print the contents of foreachExample.txt\n",
"with open(\"./foreachExample.txt\", \"r\") as foreachExample:\n",
" print (foreachExample.read())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[[], [1], [], [2], [3]]\n",
"None\n",
"[]\n",
"[1]\n",
"[]\n",
"[2]\n",
"[3]\n",
"\n"
]
}
],
"source": [
"# foreachPartition\n",
"from __future__ import print_function\n",
"x = sc.parallelize([1,2,3],5)\n",
"def f(parition):\n",
" '''side effect: append the current RDD partition contents to a file'''\n",
" f1=open(\"./foreachPartitionExample.txt\", 'a+') \n",
" print([el for el in parition],file=f1)\n",
"\n",
"open('./foreachPartitionExample.txt', 'w').close() # first clear the file contents\n",
"\n",
"y = x.foreachPartition(f) # writes into foreachExample.txt\n",
"\n",
"print(x.glom().collect())\n",
"print(y) # foreach returns 'None'\n",
"# print the contents of foreachExample.txt\n",
"with open(\"./foreachPartitionExample.txt\", \"r\") as foreachExample:\n",
" print (foreachExample.read())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"ParallelCollectionRDD[84] at parallelize at PythonRDD.scala:423\n",
"[1, 2, 3]\n"
]
}
],
"source": [
"# collect\n",
"x = sc.parallelize([1,2,3])\n",
"y = x.collect()\n",
"print(x) # distributed\n",
"print(y) # not distributed"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 2, 3]\n",
"6\n"
]
}
],
"source": [
"# reduce\n",
"x = sc.parallelize([1,2,3])\n",
"y = x.reduce(lambda obj, accumulated: obj + accumulated) # computes a cumulative sum\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 2, 3]\n",
"6\n"
]
}
],
"source": [
"# fold\n",
"x = sc.parallelize([1,2,3])\n",
"neutral_zero_value = 0 # 0 for sum, 1 for multiplication\n",
"y = x.fold(neutral_zero_value,lambda obj, accumulated: accumulated + obj) # computes cumulative sum\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[2, 3, 4]\n",
"(9, 24)\n"
]
}
],
"source": [
"# aggregate\n",
"x = sc.parallelize([2,3,4])\n",
"neutral_zero_value = (0,1) # sum: x+0 = x, product: 1*x = x\n",
"seqOp = (lambda aggregated, el: (aggregated[0] + el, aggregated[1] * el)) \n",
"combOp = (lambda aggregated, el: (aggregated[0] + el[0], aggregated[1] * el[1]))\n",
"y = x.aggregate(neutral_zero_value,seqOp,combOp) # computes (cumulative sum, cumulative product)\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 3, 2]\n",
"3\n"
]
}
],
"source": [
"# max\n",
"x = sc.parallelize([1,3,2])\n",
"y = x.max()\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 3, 2]\n",
"1\n"
]
}
],
"source": [
"# min\n",
"x = sc.parallelize([1,3,2])\n",
"y = x.min()\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 3, 2]\n",
"6\n"
]
}
],
"source": [
"# sum\n",
"x = sc.parallelize([1,3,2])\n",
"y = x.sum()\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 3, 2]\n",
"3\n"
]
}
],
"source": [
"# count\n",
"x = sc.parallelize([1,3,2])\n",
"y = x.count()\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 29,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 3, 1, 2, 3]\n",
"([1, 2, 3], [2, 3])\n"
]
}
],
"source": [
"# histogram (example #1)\n",
"x = sc.parallelize([1,3,1,2,3])\n",
"y = x.histogram(buckets = 2)\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 3, 1, 2, 3]\n",
"([0, 0.5, 1, 1.5, 2, 2.5, 3, 3.5], [0, 0, 2, 0, 1, 0, 2])\n"
]
}
],
"source": [
"# histogram (example #2)\n",
"x = sc.parallelize([1,3,1,2,3])\n",
"y = x.histogram([0,0.5,1,1.5,2,2.5,3,3.5])\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 31,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 3, 2]\n",
"2.0\n"
]
}
],
"source": [
"# mean\n",
"x = sc.parallelize([1,3,2])\n",
"y = x.mean()\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 32,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 3, 2]\n",
"0.666666666667\n"
]
}
],
"source": [
"# variance\n",
"x = sc.parallelize([1,3,2])\n",
"y = x.variance() # divides by N\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 33,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 3, 2]\n",
"0.816496580928\n"
]
}
],
"source": [
"# stdev\n",
"x = sc.parallelize([1,3,2])\n",
"y = x.stdev() # divides by N\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 34,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 3, 2]\n",
"1.0\n"
]
}
],
"source": [
"# sampleStdev\n",
"x = sc.parallelize([1,3,2])\n",
"y = x.sampleStdev() # divides by N-1\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 35,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 3, 2]\n",
"1.0\n"
]
}
],
"source": [
"# sampleVariance\n",
"x = sc.parallelize([1,3,2])\n",
"y = x.sampleVariance() # divides by N-1\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 36,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 3, 1, 2, 3]\n",
"defaultdict(, {1: 2, 2: 1, 3: 2})\n"
]
}
],
"source": [
"# countByValue\n",
"x = sc.parallelize([1,3,1,2,3])\n",
"y = x.countByValue()\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 37,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 3, 1, 2, 3]\n",
"[3, 3, 2]\n"
]
}
],
"source": [
"# top\n",
"x = sc.parallelize([1,3,1,2,3])\n",
"y = x.top(num = 3)\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 38,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 3, 1, 2, 3]\n",
"[1, 1, 2]\n"
]
}
],
"source": [
"# takeOrdered\n",
"x = sc.parallelize([1,3,1,2,3])\n",
"y = x.takeOrdered(num = 3)\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 3, 1, 2, 3]\n",
"[1, 3, 1]\n"
]
}
],
"source": [
"# take\n",
"x = sc.parallelize([1,3,1,2,3])\n",
"y = x.take(num = 3)\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 3, 1, 2, 3]\n",
"1\n"
]
}
],
"source": [
"# first\n",
"x = sc.parallelize([1,3,1,2,3])\n",
"y = x.first()\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 41,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('C', 3), ('A', 1), ('B', 2)]\n",
"{'A': 1, 'C': 3, 'B': 2}\n"
]
}
],
"source": [
"# collectAsMap\n",
"x = sc.parallelize([('C',3),('A',1),('B',2)])\n",
"y = x.collectAsMap()\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 42,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('C', 3), ('A', 1), ('B', 2)]\n",
"['C', 'A', 'B']\n"
]
}
],
"source": [
"# keys\n",
"x = sc.parallelize([('C',3),('A',1),('B',2)])\n",
"y = x.keys()\n",
"print(x.collect())\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 43,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('C', 3), ('A', 1), ('B', 2)]\n",
"[3, 1, 2]\n"
]
}
],
"source": [
"# values\n",
"x = sc.parallelize([('C',3),('A',1),('B',2)])\n",
"y = x.values()\n",
"print(x.collect())\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 44,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]\n",
"[('A', 12), ('B', 3)]\n"
]
}
],
"source": [
"# reduceByKey\n",
"x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])\n",
"y = x.reduceByKey(lambda agg, obj: agg + obj)\n",
"print(x.collect())\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 45,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]\n",
"{'A': 12, 'B': 3}\n"
]
}
],
"source": [
"# reduceByKeyLocally\n",
"x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])\n",
"y = x.reduceByKeyLocally(lambda agg, obj: agg + obj)\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 46,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]\n",
"defaultdict(, {'A': 3, 'B': 2})\n"
]
}
],
"source": [
"# countByKey\n",
"x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])\n",
"y = x.countByKey()\n",
"print(x.collect())\n",
"print(y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 47,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('C', 4), ('B', 3), ('A', 2), ('A', 1)]\n",
"[('A', 8), ('B', 7), ('A', 6), ('D', 5)]\n",
"[('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('B', (3, 7))]\n"
]
}
],
"source": [
"# join\n",
"x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])\n",
"y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)])\n",
"z = x.join(y)\n",
"print(x.collect())\n",
"print(y.collect())\n",
"print(z.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 48,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('C', 4), ('B', 3), ('A', 2), ('A', 1)]\n",
"[('A', 8), ('B', 7), ('A', 6), ('D', 5)]\n",
"[('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('C', (4, None)), ('B', (3, 7))]\n"
]
}
],
"source": [
"# leftOuterJoin\n",
"x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])\n",
"y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)])\n",
"z = x.leftOuterJoin(y)\n",
"print(x.collect())\n",
"print(y.collect())\n",
"print(z.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 49,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('C', 4), ('B', 3), ('A', 2), ('A', 1)]\n",
"[('A', 8), ('B', 7), ('A', 6), ('D', 5)]\n",
"[('A', (2, 8)), ('A', (2, 6)), ('A', (1, 8)), ('A', (1, 6)), ('B', (3, 7)), ('D', (None, 5))]\n"
]
}
],
"source": [
"# rightOuterJoin\n",
"x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])\n",
"y = sc.parallelize([('A',8),('B',7),('A',6),('D',5)])\n",
"z = x.rightOuterJoin(y)\n",
"print(x.collect())\n",
"print(y.collect())\n",
"print(z.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 50,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[[(0, 1)], [(1, 2), (2, 3)]]\n",
"[[(0, 1)], [(1, 2)], [(2, 3)]]\n"
]
}
],
"source": [
"# partitionBy\n",
"x = sc.parallelize([(0,1),(1,2),(2,3)],2)\n",
"y = x.partitionBy(numPartitions = 3, partitionFunc = lambda x: x) # only key is passed to paritionFunc\n",
"print(x.glom().collect())\n",
"print(y.glom().collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 51,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]\n",
"[('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]\n"
]
}
],
"source": [
"# combineByKey\n",
"x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])\n",
"createCombiner = (lambda el: [(el,el**2)]) \n",
"mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)]) # append to aggregated\n",
"mergeComb = (lambda agg1,agg2: agg1 + agg2 ) # append agg1 with agg2\n",
"y = x.combineByKey(createCombiner,mergeVal,mergeComb)\n",
"print(x.collect())\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 52,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]\n",
"[('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]\n"
]
}
],
"source": [
"# aggregateByKey\n",
"x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])\n",
"zeroValue = [] # empty list is 'zero value' for append operation\n",
"mergeVal = (lambda aggregated, el: aggregated + [(el,el**2)])\n",
"mergeComb = (lambda agg1,agg2: agg1 + agg2 )\n",
"y = x.aggregateByKey(zeroValue,mergeVal,mergeComb)\n",
"print(x.collect())\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 53,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('B', 1), ('B', 2), ('A', 3), ('A', 4), ('A', 5)]\n",
"[('A', 60), ('B', 2)]\n"
]
}
],
"source": [
"# foldByKey\n",
"x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])\n",
"zeroValue = 1 # one is 'zero value' for multiplication\n",
"y = x.foldByKey(zeroValue,lambda agg,x: agg*x ) # computes cumulative product within each key\n",
"print(x.collect())\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 54,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('B', 5), ('B', 4), ('A', 3), ('A', 2), ('A', 1)]\n",
"[('A', [3, 2, 1]), ('B', [5, 4])]\n"
]
}
],
"source": [
"# groupByKey\n",
"x = sc.parallelize([('B',5),('B',4),('A',3),('A',2),('A',1)])\n",
"y = x.groupByKey()\n",
"print(x.collect())\n",
"print([(j[0],[i for i in j[1]]) for j in y.collect()])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 55,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('A', (1, 2, 3)), ('B', (4, 5))]\n",
"[('A', 1), ('A', 4), ('A', 9), ('B', 16), ('B', 25)]\n"
]
}
],
"source": [
"# flatMapValues\n",
"x = sc.parallelize([('A',(1,2,3)),('B',(4,5))])\n",
"y = x.flatMapValues(lambda x: [i**2 for i in x]) # function is applied to entire value, then result is flattened\n",
"print(x.collect())\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 56,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('A', (1, 2, 3)), ('B', (4, 5))]\n",
"[('A', [1, 4, 9]), ('B', [16, 25])]\n"
]
}
],
"source": [
"# mapValues\n",
"x = sc.parallelize([('A',(1,2,3)),('B',(4,5))])\n",
"y = x.mapValues(lambda x: [i**2 for i in x]) # function is applied to entire value\n",
"print(x.collect())\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 57,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))]\n",
"[('B', (7, 7)), ('A', 6), ('D', (5, 5))]\n",
"[('D', 9), ('B', (8, 8))]\n",
"Result:\n",
"D [[], [(5, 5)], [9]]\n",
"C [[4], [], []]\n",
"B [[(3, 3)], [(7, 7)], [(8, 8)]]\n",
"A [[2, (1, 1)], [6], []]\n"
]
}
],
"source": [
"# groupWith\n",
"x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))])\n",
"y = sc.parallelize([('B',(7,7)),('A',6),('D',(5,5))])\n",
"z = sc.parallelize([('D',9),('B',(8,8))])\n",
"a = x.groupWith(y,z)\n",
"print(x.collect())\n",
"print(y.collect())\n",
"print(z.collect())\n",
"print(\"Result:\")\n",
"for key,val in list(a.collect()): \n",
" print(key, [list(i) for i in val])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 58,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('C', 4), ('B', (3, 3)), ('A', 2), ('A', (1, 1))]\n",
"[('A', 8), ('B', 7), ('A', 6), ('D', (5, 5))]\n",
"A [[2, (1, 1)], [8, 6]]\n",
"C [[4], []]\n",
"B [[(3, 3)], [7]]\n",
"D [[], [(5, 5)]]\n"
]
}
],
"source": [
"# cogroup\n",
"x = sc.parallelize([('C',4),('B',(3,3)),('A',2),('A',(1,1))])\n",
"y = sc.parallelize([('A',8),('B',7),('A',6),('D',(5,5))])\n",
"z = x.cogroup(y)\n",
"print(x.collect())\n",
"print(y.collect())\n",
"for key,val in list(z.collect()):\n",
" print(key, [list(i) for i in val])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 59,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('A', 1), ('B', 2), ('C', 3), ('B', 4), ('A', 5)]\n",
"[('A', 1), ('B', 2), ('B', 4)]\n"
]
}
],
"source": [
"# sampleByKey\n",
"x = sc.parallelize([('A',1),('B',2),('C',3),('B',4),('A',5)])\n",
"y = x.sampleByKey(withReplacement=False, fractions={'A':0.5, 'B':1, 'C':0.2})\n",
"print(x.collect())\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 60,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('C', 1), ('B', 2), ('A', 3), ('A', 4)]\n",
"[('A', 5), ('D', 6), ('A', 7), ('D', 8)]\n",
"[('C', 1), ('B', 2)]\n"
]
}
],
"source": [
"# subtractByKey\n",
"x = sc.parallelize([('C',1),('B',2),('A',3),('A',4)])\n",
"y = sc.parallelize([('A',5),('D',6),('A',7),('D',8)])\n",
"z = x.subtractByKey(y)\n",
"print(x.collect())\n",
"print(y.collect())\n",
"print(z.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 61,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[('C', 4), ('B', 3), ('A', 2), ('A', 1)]\n",
"[('C', 8), ('A', 2), ('D', 1)]\n",
"[('A', 1), ('C', 4), ('B', 3)]\n"
]
}
],
"source": [
"# subtract\n",
"x = sc.parallelize([('C',4),('B',3),('A',2),('A',1)])\n",
"y = sc.parallelize([('C',8),('A',2),('D',1)])\n",
"z = x.subtract(y)\n",
"print(x.collect())\n",
"print(y.collect())\n",
"print(z.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 62,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[1, 2, 3]\n",
"[(1, 1), (4, 2), (9, 3)]\n"
]
}
],
"source": [
"# keyBy\n",
"x = sc.parallelize([1,2,3])\n",
"y = x.keyBy(lambda x: x**2)\n",
"print(x.collect())\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 63,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[[1, 2], [3, 4, 5]]\n",
"[[], [1, 2, 3, 4], [5]]\n"
]
}
],
"source": [
"# repartition\n",
"x = sc.parallelize([1,2,3,4,5],2)\n",
"y = x.repartition(numPartitions=3)\n",
"print(x.glom().collect())\n",
"print(y.glom().collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 64,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[[1, 2], [3, 4, 5]]\n",
"[[1, 2, 3, 4, 5]]\n"
]
}
],
"source": [
"# coalesce\n",
"x = sc.parallelize([1,2,3,4,5],2)\n",
"y = x.coalesce(numPartitions=1)\n",
"print(x.glom().collect())\n",
"print(y.glom().collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 65,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['B', 'A', 'A']\n",
"[66, 65, 65]\n",
"[('B', 66), ('A', 65), ('A', 65)]\n"
]
}
],
"source": [
"# zip\n",
"x = sc.parallelize(['B','A','A'])\n",
"y = x.map(lambda x: ord(x)) # zip expects x and y to have same #partitions and #elements/partition\n",
"z = x.zip(y)\n",
"print(x.collect())\n",
"print(y.collect())\n",
"print(z.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 66,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[['B'], ['A', 'A']]\n",
"[('B', 0), ('A', 1), ('A', 2)]\n"
]
}
],
"source": [
"# zipWithIndex\n",
"x = sc.parallelize(['B','A','A'],2)\n",
"y = x.zipWithIndex()\n",
"print(x.glom().collect())\n",
"print(y.collect())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"\n",
""
]
},
{
"cell_type": "code",
"execution_count": 67,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[['B'], ['A', 'A']]\n",
"[('B', 0), ('A', 1), ('A', 3)]\n"
]
}
],
"source": [
"# zipWithUniqueId\n",
"x = sc.parallelize(['B','A','A'],2)\n",
"y = x.zipWithUniqueId()\n",
"print(x.glom().collect())\n",
"print(y.collect())"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 1
}