-ref http://stackoverflow.com/questions/30940631/how-do-i-setup-pyspark-in-python-3-with-spark-env-sh-template

In [1]:
!python --version
! date

Python 3.4.5 :: Continuum Analytics, Inc.
Thu Nov 3 15:00:16 CST 2016


In [2]:
sc



In [3]:
sc.master

'local[*]'

In [4]:
import datetime as dt 
import time
import pandas as pd, numpy as np
import pprint
import matplotlib.pyplot as plt
import seaborn as sns
%pylab inline

Populating the interactive namespace from numpy and matplotlib




# * Run SPARK in ipython notebook

# Index 

- Basics RDD
- map 
- Reduce 

# Create RDD

In [5]:
# create RDD

intRDD = sc.parallelize([6,7,1,2,0])
intRDD2 = sc.parallelize(["apple", "car", "pan"])

In [6]:
type(intRDD)

pyspark.rdd.RDD

In [7]:
intRDD.collect()

[6, 7, 1, 2, 0]

In [8]:
intRDD2.collect()

['apple', 'car', 'pan']

# * Transformation

- Not executed until action has been run 

# Map 

In [9]:
def addone(x):
 return (x+1)

In [10]:
# set env variable via export PYSPARK_PYTHON=python3 for setting SPARK run in python 3 

intRDD.map(addone).collect()

[7, 8, 2, 3, 1]

In [11]:
# or via lammbda 

intRDD.map(lambda x : x + 1 ).collect()

[7, 8, 2, 3, 1]

In [12]:
# work with words RDD 

intRDD2.map(lambda x : "object: "+ x ).collect()

['object: apple', 'object: car', 'object: pan']

# Filter

In [13]:
intRDD.filter(lambda x : x >5).collect()

[6, 7]

In [14]:
intRDD.filter(lambda x : x > 1 & x < 5).collect()

[6, 7, 2]

In [15]:
intRDD2.filter(lambda x : "ar" in x ).collect()

['car']

# Distinct

In [16]:
intRDD3 = sc.parallelize([1,1,2,2,3])
intRDD4 = sc.parallelize(["apple","apple","car", "car", "pan"])

In [17]:
intRDD3.distinct().collect()

[1, 2, 3]

In [18]:
intRDD4.distinct().collect()

['car', 'pan', 'apple']

# Random split

In [19]:
sRDD = intRDD.randomSplit([0.4,0.6])

In [20]:
sRDD[0].collect()

[7, 1, 2]

In [21]:
sRDD[1].collect()

[6, 0]

# Group by 

In [22]:
gRDD = intRDD.groupBy(lambda x : "even" if (x%2 ==0 ) else "odd").collect()

In [23]:
type(gRDD)

list

In [24]:
print (gRDD)

[('even', ), ('odd', )]


In [25]:
gRDD[0][0] , sorted(gRDD[0][1])

('even', [0, 2, 6])

In [26]:
gRDD[1][0] , sorted(gRDD[1][1])

('odd', [1, 7])

# Multiple RDDs

In [27]:
intRDD5 = sc.parallelize([3,1,2,2,5])
intRDD6 = sc.parallelize([1,0])
intRDD7 = sc.parallelize([4,5,6])

# Union

In [28]:
intRDD5.union(intRDD6).union(intRDD7).collect()

[3, 1, 2, 2, 5, 1, 0, 4, 5, 6]

# Intersection

In [29]:
intRDD5.intersection(intRDD6).collect()

[1]

# Subtract

In [30]:
intRDD5.subtract(intRDD7).collect()

[1, 2, 2, 3]

# * Action

- SPARK will work on RDD right after Action executed 

In [31]:
intRDD = sc.parallelize([6,7,1,2,0])

In [32]:
intRDD.first()

6

In [33]:
intRDD.take(3)

[6, 7, 1]

In [34]:
#take orderly 3 elements 

intRDD.takeOrdered(3)

[0, 1, 2]

In [35]:
# first sort RDD orderly, then take first 3 elements 

intRDD.takeOrdered(3, key = lambda x: -x)

[7, 6, 2]

# Statistics 

In [36]:
print (intRDD.stats())

print (intRDD.min())

print (intRDD.max())

print (intRDD.stdev())

print (intRDD.sum())

print (intRDD.mean())

(count: 5, mean: 3.2, stdev: 2.78567765544, max: 7.0, min: 0.0)
0
7
2.78567765544
16
3.2


# Key-Value Transformation

In [37]:
kvRDD1 = sc.parallelize([(1,2),(3,6),(5,6),(0,9)])
kvRDD1.collect()

[(1, 2), (3, 6), (5, 6), (0, 9)]

In [38]:
# get keys 
kvRDD1.keys().collect()

[1, 3, 5, 0]

In [39]:
# get values 
kvRDD1.values().collect()

[2, 6, 6, 9]

# Filter with key

In [40]:
kvRDD1.filter(lambda keyvalue : keyvalue[0] < 3 ).collect()

[(1, 2), (0, 9)]

# Filter with Value

In [41]:
kvRDD1.filter(lambda keyvalue : keyvalue[1] < 3 ).collect()

[(1, 2)]

# Sort by key

In [42]:
kvRDD1.sortByKey(ascending=True).collect()

[(0, 9), (1, 2), (3, 6), (5, 6)]

# Reduce by key **

In [43]:
kvRDD2 = sc.parallelize([(1,2),(1,3),(5,6),(3,9),(3,1)])
kvRDD2.collect()

[(1, 2), (1, 3), (5, 6), (3, 9), (3, 1)]

In [44]:
# (1, 2+3) , (5,6), (3, 9+1)
# sum up values of (key,value) with same key 

kvRDD2.reduceByKey(lambda x,y : x+y).collect()

[(1, 5), (5, 6), (3, 10)]

# Multi - Key-Value Transformation

In [45]:
kvRDD3 = sc.parallelize([(1,2),(1,3),(5,6),(3,9),(3,1)])
kvRDD4 = sc.parallelize([(3,8)])

In [46]:
# key - value RDD join

kvRDD3.join(kvRDD4).collect()

[(3, (9, 8)), (3, (1, 8))]

In [47]:
# key - value left OUTER RDD join

kvRDD3.leftOuterJoin(kvRDD4).collect()

[(1, (2, None)), (1, (3, None)), (3, (9, 8)), (3, (1, 8)), (5, (6, None))]

In [48]:
# key - value left OUTER RDD join

kvRDD3.rightOuterJoin(kvRDD4).collect()

[(3, (9, 8)), (3, (1, 8))]

In [49]:
# key - value subtract by key 

kvRDD3.subtractByKey(kvRDD4).collect()

[(1, 2), (1, 3), (5, 6)]

# Key-Value Action

In [50]:
kvRDD3.first()

(1, 2)

In [51]:
kvRDD3.first()[0]

1

In [52]:
kvRDD3.first()[1]

2

In [53]:
kvRDD3.take(2)

[(1, 2), (1, 3)]

# Count by key

In [54]:
kvRDD3.countByKey()

defaultdict(int, {1: 2, 3: 2, 5: 1})

# Collect as map

In [55]:
KV = kvRDD3.collectAsMap()
KV

{1: 3, 3: 1, 5: 6}

In [56]:
type(KV)

dict

In [58]:
# key - value lookup

kvRDD3.lookup(5)

[6]

# Broadcast

In [69]:
kvFruit = sc.parallelize([(1,"apple"),(2,"banana"),(3,"peach"),(4,"grape")])

In [71]:
fruitMap = kvFruit.collectAsMap()
fruitMap

{1: 'apple', 2: 'banana', 3: 'peach', 4: 'grape'}

In [82]:
# Broadcast "bcFruitMap"

bcFruitMap = sc.broadcast(fruitMap)

In [76]:
fruitIds = sc.parallelize([2,4,1,3])
fruitIds.collect()

[2, 4, 1, 3]

In [84]:
# using broadcast dict 

fruitNames = fruitIds.map(lambda x : bcFruitMap.value[x]).collect()

fruitNames

['banana', 'grape', 'apple', 'peach']

# Accumulator

In [85]:
# pass

# RDD Persistence 

In [86]:
# pass

# ** Case : WordCount

In [110]:
# fruit is a txt file with data like this :
"""
apple apple orange 
banana grape grape 
"""

textfile = sc.textFile("fruit.txt")

In [111]:
textfile.collect()

['apple apple orange', 'banana grape grape']

In [112]:
# map 

stringRDD = textfile.map(lambda line : line.split(" "))
stringRDD.collect()

[['apple', 'apple', 'orange'], ['banana', 'grape', 'grape']]

In [113]:
# flatmap 

stringRDD = textfile.flatMap(lambda line : line.split(" "))
stringRDD.collect()

['apple', 'apple', 'orange', 'banana', 'grape', 'grape']

In [115]:
# key-value pair by Map

stringRDD.map(lambda word : (word,1)).collect()

[('apple', 1),
 ('apple', 1),
 ('orange', 1),
 ('banana', 1),
 ('grape', 1),
 ('grape', 1)]

In [116]:
# word count by reduceByKey

stringRDD.map(lambda word : (word,1)).reduceByKey(lambda x,y : x+y).collect()

[('orange', 1), ('banana', 1), ('apple', 2), ('grape', 2)]