<a>
<img align=left src="files/images/pyspark-pictures-dataframes-page1.svg" width=500 height=250>
</a>

* [RDD API](http://nbviewer.ipython.org/github/jkthompson/pyspark-pictures/blob/master/pyspark-pictures.ipynb)
* [GitHub](https://github.com/jkthompson/pyspark-pictures)
* [related blog post](http://data-frack.blogspot.com/2015/01/visual-mnemonics-for-pyspark-api.html)

<a>
<img align=left src="files/images/pyspark-pictures-dataframes-page2.svg" width=500 height=500>
</a>

# Click on a picture to view pyspark docs

In [1]:
# versions
import IPython
print("pyspark version:" + str(sc.version))
print("Ipython version:" + str(IPython.__version__))

pyspark version:1.6.1
Ipython version:4.2.0


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.agg">
<img align=left src="files/images/pyspark-pictures-dataframes-page3.svg" width=500 height=500 />
</a>

In [2]:
# agg
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.agg({"amt":"avg"})
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-------------------+
|           avg(amt)|
+-------------------+
|0.20000000000000004|
+-------------------+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.alias">
<img align=left src="files/images/pyspark-pictures-dataframes-page4.svg" width=750 height=750 />
</a>

In [3]:
# alias
from pyspark.sql.functions import col
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.alias('transactions')
x.show()
y.show()
y.select(col("transactions.to")).show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+
|   to|
+-----+
|  Bob|
|Carol|
| Dave|
+-----+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.cache">
<img align=left src="files/images/pyspark-pictures-dataframes-page5.svg" width=750 height=750 />
</a>

In [4]:
# cache
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.cache()
print(x.count()) # first action materializes x in memory
print(x.count()) # later actions avoid IO overhead

3
3


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.coalesce">
<img align=left src="files/images/pyspark-pictures-dataframes-page6.svg" width=750 height=750 />
</a>

In [5]:
# coalesce
x_rdd = sc.parallelize([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)],2)
x = sqlContext.createDataFrame(x_rdd, ['from','to','amt'])
y = x.coalesce(numPartitions=1)
print(x.rdd.getNumPartitions())
print(y.rdd.getNumPartitions())

2
1


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.collect">
<img align=left src="files/images/pyspark-pictures-dataframes-page7.svg" width=750 height=750 />
</a>

In [6]:
# collect
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.collect() # creates list of rows on driver
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)]


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.columns">
<img align=left src="files/images/pyspark-pictures-dataframes-page8.svg" width=500 height=500 />
</a>

In [7]:
# columns
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.columns #creates list of column names on driver
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

['from', 'to', 'amt']


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.corr">
<img align=left src="files/images/pyspark-pictures-dataframes-page9.svg" width=500 height=500 />
</a>

In [8]:
# corr
x = sqlContext.createDataFrame([("Alice","Bob",0.1,0.001),("Bob","Carol",0.2,0.02),("Carol","Dave",0.3,0.02)], ['from','to','amt','fee'])
y = x.corr(col1="amt",col2="fee")
x.show()
print(y)

+-----+-----+---+-----+
| from|   to|amt|  fee|
+-----+-----+---+-----+
|Alice|  Bob|0.1|0.001|
|  Bob|Carol|0.2| 0.02|
|Carol| Dave|0.3| 0.02|
+-----+-----+---+-----+

0.866025403784


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.count">
<img align=left src="files/images/pyspark-pictures-dataframes-page10.svg" width=500 height=500 />
</a>

In [9]:
# count
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()
print(x.count())

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

3


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.cov">
<img align=left src="files/images/pyspark-pictures-dataframes-page11.svg" width=500 height=500 />
</a>

In [10]:
# cov
x = sqlContext.createDataFrame([("Alice","Bob",0.1,0.001),("Bob","Carol",0.2,0.02),("Carol","Dave",0.3,0.02)], ['from','to','amt','fee'])
y = x.cov(col1="amt",col2="fee")
x.show()
print(y)

+-----+-----+---+-----+
| from|   to|amt|  fee|
+-----+-----+---+-----+
|Alice|  Bob|0.1|0.001|
|  Bob|Carol|0.2| 0.02|
|Carol| Dave|0.3| 0.02|
+-----+-----+---+-----+

0.00095


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.crosstab">
<img align=left src="files/images/pyspark-pictures-dataframes-page12.svg" width=500 height=500 />
</a>

In [11]:
# crosstab
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.crosstab(col1='from',col2='to')
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-------+----+-----+---+
|from_to|Dave|Carol|Bob|
+-------+----+-----+---+
|    Bob|   0|    1|  0|
|  Alice|   0|    0|  1|
|  Carol|   1|    0|  0|
+-------+----+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.cube">
<img align=left src="files/images/pyspark-pictures-dataframes-page13.svg" width=500 height=500 />
</a>

In [12]:
# cube
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Alice","Carol",0.2)], ['from','to','amt'])
y = x.cube('from','to')
x.show()
print(y) # y is a grouped data object, aggregations will be applied to all numerical columns
y.sum().show() 
y.max().show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|Alice|Carol|0.2|
+-----+-----+---+

<pyspark.sql.group.GroupedData object at 0x7fc5383bcc50>
+-----+-----+-------------------+
| from|   to|           sum(amt)|
+-----+-----+-------------------+
|Alice|Carol|                0.2|
|Alice|  Bob|                0.1|
|Alice| null|0.30000000000000004|
| null|Carol|                0.2|
| null|  Bob|                0.1|
| null| null|0.30000000000000004|
+-----+-----+-------------------+

+-----+-----+--------+
| from|   to|max(amt)|
+-----+-----+--------+
|Alice|Carol|     0.2|
|Alice|  Bob|     0.1|
|Alice| null|     0.2|
| null|Carol|     0.2|
| null|  Bob|     0.1|
| null| null|     0.2|
+-----+-----+--------+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.describe">
<img align=left src="files/images/pyspark-pictures-dataframes-page14.svg" width=500 height=500 />
</a>

In [13]:
# describe
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()
x.describe().show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-------+-------------------+
|summary|                amt|
+-------+-------------------+
|  count|                  3|
|   mean|0.20000000000000004|
| stddev|0.09999999999999998|
|    min|                0.1|
|    max|                0.3|
+-------+-------------------+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.distinct">
<img align=left src="files/images/pyspark-pictures-dataframes-page15.svg" width=500 height=500 />
</a>

In [14]:
# distinct
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3),("Bob","Carol",0.2)], ['from','to','amt'])
y = x.distinct()
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
|  Bob|Carol|0.2|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
|Alice|  Bob|0.1|
+-----+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.drop">
<img align=left src="files/images/pyspark-pictures-dataframes-page16.svg" width=500 height=500 />
</a>

In [15]:
# drop
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.drop('amt')
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+
| from|   to|
+-----+-----+
|Alice|  Bob|
|  Bob|Carol|
|Carol| Dave|
+-----+-----+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropDuplicates">
<img align=left src="files/images/pyspark-pictures-dataframes-page17.svg" width=500 height=500 />
</a>

In [16]:
# dropDuplicates / drop_duplicates
x = sqlContext.createDataFrame([("Alice","Bob",0.1),("Bob","Carol",0.2),("Bob","Carol",0.3),("Bob","Carol",0.2)], ['from','to','amt'])
y = x.dropDuplicates(subset=['from','to'])
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|  Bob|Carol|0.3|
|  Bob|Carol|0.2|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.2|
|Alice|  Bob|0.1|
+-----+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dropna">
<img align=left src="files/images/pyspark-pictures-dataframes-page18.svg" width=500 height=500 />
<

In [17]:
# dropna
x = sqlContext.createDataFrame([(None,"Bob",0.1),("Bob","Carol",None),("Carol",None,0.3),("Bob","Carol",0.2)], ['from','to','amt'])
y = x.dropna(how='any',subset=['from','to'])
x.show()
y.show()

+-----+-----+----+
| from|   to| amt|
+-----+-----+----+
| null|  Bob| 0.1|
|  Bob|Carol|null|
|Carol| null| 0.3|
|  Bob|Carol| 0.2|
+-----+-----+----+

+----+-----+----+
|from|   to| amt|
+----+-----+----+
| Bob|Carol|null|
| Bob|Carol| 0.2|
+----+-----+----+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dtypes">
<img align=left src="files/images/pyspark-pictures-dataframes-page19.svg" width=500 height=500 />
</a>

In [18]:
# dtypes
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.dtypes
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[('from', 'string'), ('to', 'string'), ('amt', 'double')]


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.explain">
<img align=left src="files/images/pyspark-pictures-dataframes-page20.svg" width=500 height=500 />
</a>

In [19]:
# explain
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()
x.agg({"amt":"avg"}).explain(extended = True)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

== Parsed Logical Plan ==
'Aggregate ['avg(amt#296) AS avg(amt)#297]
+- LogicalRDD [from#294,to#295,amt#296], MapPartitionsRDD[194] at applySchemaToPythonRDD at null:-1

== Analyzed Logical Plan ==
avg(amt): double
Aggregate [(avg(amt#296),mode=Complete,isDistinct=false) AS avg(amt)#297]
+- LogicalRDD [from#294,to#295,amt#296], MapPartitionsRDD[194] at applySchemaToPythonRDD at null:-1

== Optimized Logical Plan ==
Aggregate [(avg(amt#296),mode=Complete,isDistinct=false) AS avg(amt)#297]
+- Project [amt#296]
   +- LogicalRDD [from#294,to#295,amt#296], MapPartitionsRDD[194] at applySchemaToPythonRDD at null:-1

== Physical Plan ==
TungstenAggregate(key=[], functions=[(avg(amt#296),mode=Final,isDistinct=false)], output=[avg(amt)#297])
+- TungstenExchange SinglePartition, None
   +- TungstenAggregate(key=[], functions=[(avg(amt#296),mode=Partial,isDistinct=false)]

<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.fillna">
<img align=left src="files/images/pyspark-pictures-dataframes-page21.svg" width=500 height=500 />
</a>

In [20]:
# fillna
x = sqlContext.createDataFrame([(None,"Bob",0.1),("Bob","Carol",None),("Carol",None,0.3)], ['from','to','amt'])
y = x.fillna(value='unknown',subset=['from','to'])
x.show()
y.show()

+-----+-----+----+
| from|   to| amt|
+-----+-----+----+
| null|  Bob| 0.1|
|  Bob|Carol|null|
|Carol| null| 0.3|
+-----+-----+----+

+-------+-------+----+
|   from|     to| amt|
+-------+-------+----+
|unknown|    Bob| 0.1|
|    Bob|  Carol|null|
|  Carol|unknown| 0.3|
+-------+-------+----+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.filter">
<img align=left src="files/images/pyspark-pictures-dataframes-page22.svg" width=500 height=500 />
</a>

In [21]:
# filter
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.filter("amt > 0.1")
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.first">
<img align=left src="files/images/pyspark-pictures-dataframes-page23.svg" width=500 height=500 />
</a>

In [22]:
# first
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.first()
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

Row(from=u'Alice', to=u'Bob', amt=0.1)


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.flatMap">
<img align=left src="files/images/pyspark-pictures-dataframes-page24.svg" width=500 height=500 />
</a>

In [23]:
# flatMap
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.flatMap(lambda x: (x[0],x[2])) 
print(y) # implicit coversion to RDD
y.collect()

PythonRDD[227] at RDD at PythonRDD.scala:43


[u'Alice', 0.1, u'Bob', 0.2, u'Carol', 0.3]

<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.foreach">
<img align=left src="files/images/pyspark-pictures-dataframes-page25.svg" width=500 height=500 />
</a>

In [24]:
# foreach
from __future__ import print_function

# setup
fn = './foreachExampleDataFrames.txt' 
open(fn, 'w').close()  # clear the file
def fappend(el,f):
    '''appends el to file f'''
    print(el,file=open(f, 'a+') )

# example
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.foreach(lambda x: fappend(x,fn)) # writes into foreachExampleDataFrames.txt
x.show() # original dataframe
print(y) # foreach returns 'None'
# print the contents of the file
with open(fn, "r") as foreachExample:
    print (foreachExample.read())

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

None
Row(from=u'Carol', to=u'Dave', amt=0.3)
Row(from=u'Bob', to=u'Carol', amt=0.2)
Row(from=u'Alice', to=u'Bob', amt=0.1)



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.foreachPartition">
<img align=left src="files/images/pyspark-pictures-dataframes-page26.svg" width=500 height=500 />
</a>

In [25]:
# foreachPartition
from __future__ import print_function

#setup
fn = './foreachPartitionExampleDataFrames.txt'
open(fn, 'w').close()  # clear the file
def fappend(partition,f):
    '''append all elements in partition to file f'''
    print([el for el in partition],file=open(f, 'a+'))

x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x = x.repartition(2) # force 2 partitions
y = x.foreachPartition(lambda x: fappend(x,fn)) # writes into foreachPartitionExampleDataFrames.txt

x.show() # original dataframe
print(y) # foreach returns 'None'
# print the contents of the file
with open(fn, "r") as foreachExample:
    print (foreachExample.read())

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

None
[]
[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)]



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.freqItems">
<img align=left src="files/images/pyspark-pictures-dataframes-page27.svg" width=500 height=500 />
</a>

In [26]:
# freqItems
x = sqlContext.createDataFrame([("Bob","Carol",0.1), \
                                ("Alice","Dave",0.1), \
                                ("Alice","Bob",0.1), \
                                ("Alice","Bob",0.5), \
                                ("Carol","Bob",0.1)], \
                               ['from','to','amt'])
y = x.freqItems(cols=['from','amt'],support=0.8)
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.1|
|Alice| Dave|0.1|
|Alice|  Bob|0.1|
|Alice|  Bob|0.5|
|Carol|  Bob|0.1|
+-----+-----+---+

+--------------+-------------+
|from_freqItems|amt_freqItems|
+--------------+-------------+
|       [Alice]|        [0.1]|
+--------------+-------------+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy">
<img align=left src="files/images/pyspark-pictures-dataframes-page28.svg" width=500 height=500 />
</a>

In [27]:
# groupBy
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Alice","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.groupBy('from')
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|Alice|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

<pyspark.sql.group.GroupedData object at 0x7fc53831f5d0>


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy">
<img align=left src="files/images/pyspark-pictures-dataframes-page29.svg" width=500 height=500 />
</a>

In [28]:
# groupBy(col1).avg(col2)
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Alice","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.groupBy('from').avg('amt')
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|Alice|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-------------------+
| from|           avg(amt)|
+-----+-------------------+
|Carol|                0.3|
|Alice|0.15000000000000002|
+-----+-------------------+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.head">
<img align=left src="files/images/pyspark-pictures-dataframes-page30.svg" width=500 height=500 />
</a>

In [29]:
# head
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.head(2)
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2)]


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.intersect">
<img align=left src="files/images/pyspark-pictures-dataframes-page31.svg" width=500 height=500 />
</a>

In [30]:
# intersect
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Alice",0.2),("Carol","Dave",0.1)], ['from','to','amt'])
z = x.intersect(y)
x.show()
y.show()
z.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Alice|0.2|
|Carol| Dave|0.1|
+-----+-----+---+

+-----+---+---+
| from| to|amt|
+-----+---+---+
|Alice|Bob|0.1|
+-----+---+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.isLocal">
<img align=left src="files/images/pyspark-pictures-dataframes-page32.svg" width=500 height=500 />
</a>

In [31]:
# isLocal
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.isLocal()
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

False


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.join">
<img align=left src="files/images/pyspark-pictures-dataframes-page33.svg" width=500 height=500 />
</a>

In [32]:
# join
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = sqlContext.createDataFrame([('Alice',20),("Bob",40),("Dave",80)], ['name','age'])
z = x.join(y,x.to == y.name,'inner').select('from','to','amt','age')
x.show()
y.show()
z.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+---+
| name|age|
+-----+---+
|Alice| 20|
|  Bob| 40|
| Dave| 80|
+-----+---+

+-----+----+---+---+
| from|  to|amt|age|
+-----+----+---+---+
|Carol|Dave|0.3| 80|
|Alice| Bob|0.1| 40|
+-----+----+---+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.limit">
<img align=left src="files/images/pyspark-pictures-dataframes-page34.svg" width=500 height=500 />
</a>

In [33]:
# limit
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.limit(2)
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
+-----+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.map">
<img align=left src="files/images/pyspark-pictures-dataframes-page35.svg" width=500 height=500 />
</a>

In [34]:
# map
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.map(lambda x: x.amt+1)
x.show()
print(y.collect())  # output is RDD

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[1.1, 1.2, 1.3]


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapPartitions">
<img align=left src="files/images/pyspark-pictures-dataframes-page36.svg" width=500 height=500 />
</a>

In [35]:
# mapPartitions
def amt_sum(partition):
    '''sum the value in field amt'''
    yield sum([el.amt for el in partition])
    
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x = x.repartition(2) # force 2 partitions
y = x.mapPartitions(lambda p: amt_sum(p))
x.show()
print(x.rdd.glom().collect()) # flatten elements on the same partition
print(y.collect())
print(y.glom().collect())

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)], []]
[0.6000000000000001, 0]
[[0.6000000000000001], [0]]


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.na">
<img align=left src="files/images/pyspark-pictures-dataframes-page37.svg" width=500 height=500 />
</a>

In [36]:
# na
x = sqlContext.createDataFrame([(None,"Bob",0.1),("Bob","Carol",None),("Carol",None,0.3),("Bob","Carol",0.2)], ['from','to','amt'])
y = x.na  # returns an object for handling missing values, supports drop, fill, and replace methods
x.show()
print(y)
y.drop().show()
y.fill({'from':'unknown','to':'unknown','amt':0}).show()
y.fill(0).show()

+-----+-----+----+
| from|   to| amt|
+-----+-----+----+
| null|  Bob| 0.1|
|  Bob|Carol|null|
|Carol| null| 0.3|
|  Bob|Carol| 0.2|
+-----+-----+----+

<pyspark.sql.dataframe.DataFrameNaFunctions object at 0x7fc538392b90>
+----+-----+---+
|from|   to|amt|
+----+-----+---+
| Bob|Carol|0.2|
+----+-----+---+

+-------+-------+---+
|   from|     to|amt|
+-------+-------+---+
|unknown|    Bob|0.1|
|    Bob|  Carol|0.0|
|  Carol|unknown|0.3|
|    Bob|  Carol|0.2|
+-------+-------+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
| null|  Bob|0.1|
|  Bob|Carol|0.0|
|Carol| null|0.3|
|  Bob|Carol|0.2|
+-----+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.orderBy">
<img align=left src="files/images/pyspark-pictures-dataframes-page38.svg" width=500 height=500 />
</a>

In [37]:
# orderBy
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.orderBy(['from'],ascending=[False])
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Carol| Dave|0.3|
|  Bob|Carol|0.2|
|Alice|  Bob|0.1|
+-----+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.persist">
<img align=left src="files/images/pyspark-pictures-dataframes-page39.svg" width=500 height=500 />
</a>

In [38]:
# persist
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.persist(storageLevel=StorageLevel(True,True,False,True,1)) # StorageLevel(useDisk,useMemory,useOffHeap,deserialized,replication=1)
x.show()
x.is_cached

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+



True

<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.printSchema">
<img align=left src="files/images/pyspark-pictures-dataframes-page40.svg" width=500 height=500 />
</a>

In [39]:
# printSchema
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()
x.printSchema()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

root
 |-- from: string (nullable = true)
 |-- to: string (nullable = true)
 |-- amt: double (nullable = true)



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit">
<img align=left src="files/images/pyspark-pictures-dataframes-page41.svg" width=500 height=500 />
</a>

In [40]:
# randomSplit
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.randomSplit([0.5,0.5])
x.show()
y[0].show()
y[1].show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+----+---+---+
|from| to|amt|
+----+---+---+
+----+---+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.rdd">
<img align=left src="files/images/pyspark-pictures-dataframes-page42.svg" width=500 height=500 />
</a>

In [41]:
# rdd
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.rdd
x.show()
print(y.collect())

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2), Row(from=u'Carol', to=u'Dave', amt=0.3)]


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.registerTempTable">
<img align=left src="files/images/pyspark-pictures-dataframes-page43.svg" width=500 height=500 />
</a>

In [42]:
# registerTempTable
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.registerTempTable(name="TRANSACTIONS")
y = sqlContext.sql('SELECT * FROM TRANSACTIONS WHERE amt > 0.1')
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.repartition">
<img align=left src="files/images/pyspark-pictures-dataframes-page44.svg" width=500 height=500 />
</a>

In [43]:
# repartition
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.repartition(3)
print(x.rdd.getNumPartitions())
print(y.rdd.getNumPartitions())

4
3


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace">
<img align=left src="files/images/pyspark-pictures-dataframes-page45.svg" width=500 height=500 />
</a>

In [44]:
# replace
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.replace('Dave','David',['from','to'])
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol|David|0.3|
+-----+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.rollup">
<img align=left src="files/images/pyspark-pictures-dataframes-page46.svg" width=500 height=500 />
</a>

In [45]:
# rollup
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.rollup(['from','to'])
x.show()
print(y) # y is a grouped data object, aggregations will be applied to all numerical columns
y.sum().show()
y.max().show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

<pyspark.sql.group.GroupedData object at 0x7fc5382f7850>
+-----+-----+------------------+
| from|   to|          sum(amt)|
+-----+-----+------------------+
|Alice|  Bob|               0.1|
|  Bob|Carol|               0.2|
|Alice| null|               0.1|
|Carol| Dave|               0.3|
|  Bob| null|               0.2|
|Carol| null|               0.3|
| null| null|0.6000000000000001|
+-----+-----+------------------+

+-----+-----+--------+
| from|   to|max(amt)|
+-----+-----+--------+
|Alice|  Bob|     0.1|
|  Bob|Carol|     0.2|
|Alice| null|     0.1|
|Carol| Dave|     0.3|
|  Bob| null|     0.2|
|Carol| null|     0.3|
| null| null|     0.3|
+-----+-----+--------+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.sample">
<img align=left src="files/images/pyspark-pictures-dataframes-page47.svg" width=500 height=500 />
</a>

In [46]:
# sample
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.sample(False,0.5)
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+----+-----+---+
|from|   to|amt|
+----+-----+---+
| Bob|Carol|0.2|
+----+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.sampleBy">
<img align=left src="files/images/pyspark-pictures-dataframes-page48.svg" width=500 height=500 />
</a>

In [47]:
# sampleBy
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Alice","Carol",0.2),("Alice","Alice",0.3), \
                               ('Alice',"Dave",0.4),("Bob","Bob",0.5),("Bob","Carol",0.6)], \
                                ['from','to','amt'])
y = x.sampleBy(col='from',fractions={'Alice':0.1,'Bob':0.9})
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|Alice|Carol|0.2|
|Alice|Alice|0.3|
|Alice| Dave|0.4|
|  Bob|  Bob|0.5|
|  Bob|Carol|0.6|
+-----+-----+---+

+----+-----+---+
|from|   to|amt|
+----+-----+---+
| Bob|  Bob|0.5|
| Bob|Carol|0.6|
+----+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.schema>
<img align=left src="files/images/pyspark-pictures-dataframes-page49.svg" width=500 height=500 />
</a>

In [48]:
# schema
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.schema
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

StructType(List(StructField(from,StringType,true),StructField(to,StringType,true),StructField(amt,DoubleType,true)))


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.select">
<img align=left src="files/images/pyspark-pictures-dataframes-page50.svg" width=500 height=500 />
</a>

In [49]:
# select
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.select(['from','amt'])
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+---+
| from|amt|
+-----+---+
|Alice|0.1|
|  Bob|0.2|
|Carol|0.3|
+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.selectExpr">
<img align=left src="files/images/pyspark-pictures-dataframes-page51.svg" width=500 height=500 />
</a>

In [50]:
# selectExpr
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.selectExpr(['substr(from,1,1)','amt+10'])
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+----------------+----------+
|substr(from,1,1)|(amt + 10)|
+----------------+----------+
|               A|      10.1|
|               B|      10.2|
|               C|      10.3|
+----------------+----------+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.show">
<img align=left src="files/images/pyspark-pictures-dataframes-page52.svg" width=500 height=500 />
</a>

In [51]:
# show
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.sort">
<img align=left src="files/images/pyspark-pictures-dataframes-page53.svg" width=500 height=500 />
</a>

In [52]:
# sort
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Alice",0.3)], ['from','to','amt'])
y = x.sort(['to'])
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol|Alice|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Carol|Alice|0.3|
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
+-----+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.sortWithinPartitions">
<img align=left src="files/images/pyspark-pictures-dataframes-page54.svg" width=500 height=500 />
</a>

In [53]:
# sortWithinPartitions
x = sqlContext.createDataFrame([('Alice',"Bob",0.1,1),("Bob","Carol",0.2,2),("Carol","Alice",0.3,2)], \
                               ['from','to','amt','p_id']).repartition(2,'p_id')
y = x.sortWithinPartitions(['to'])
x.show()
y.show()
print(x.rdd.glom().collect()) # glom() flattens elements on the same partition
print(y.rdd.glom().collect())

+-----+-----+---+----+
| from|   to|amt|p_id|
+-----+-----+---+----+
|Alice|  Bob|0.1|   1|
|  Bob|Carol|0.2|   2|
|Carol|Alice|0.3|   2|
+-----+-----+---+----+

+-----+-----+---+----+
| from|   to|amt|p_id|
+-----+-----+---+----+
|Alice|  Bob|0.1|   1|
|Carol|Alice|0.3|   2|
|  Bob|Carol|0.2|   2|
+-----+-----+---+----+

[[Row(from=u'Alice', to=u'Bob', amt=0.1, p_id=1)], [Row(from=u'Bob', to=u'Carol', amt=0.2, p_id=2), Row(from=u'Carol', to=u'Alice', amt=0.3, p_id=2)]]
[[Row(from=u'Alice', to=u'Bob', amt=0.1, p_id=1)], [Row(from=u'Carol', to=u'Alice', amt=0.3, p_id=2), Row(from=u'Bob', to=u'Carol', amt=0.2, p_id=2)]]


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.stat">
<img align=left src="files/images/pyspark-pictures-dataframes-page55.svg" width=500 height=500 />
</a>

In [54]:
# stat
x = sqlContext.createDataFrame([("Alice","Bob",0.1,0.001),("Bob","Carol",0.2,0.02),("Carol","Dave",0.3,0.02)], ['from','to','amt','fee'])
y = x.stat
x.show()
print(y)
print(y.corr(col1="amt",col2="fee"))

+-----+-----+---+-----+
| from|   to|amt|  fee|
+-----+-----+---+-----+
|Alice|  Bob|0.1|0.001|
|  Bob|Carol|0.2| 0.02|
|Carol| Dave|0.3| 0.02|
+-----+-----+---+-----+

<pyspark.sql.dataframe.DataFrameStatFunctions object at 0x7fc5382f7a50>
0.866025403784


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.subtract">
<img align=left src="files/images/pyspark-pictures-dataframes-page56.svg" width=500 height=500 />
</a>

In [55]:
# subtract
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.1)], ['from','to','amt'])
z = x.subtract(y)
x.show()
y.show()
z.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.1|
+-----+-----+---+

+-----+----+---+
| from|  to|amt|
+-----+----+---+
|Carol|Dave|0.3|
+-----+----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.take">
<img align=left src="files/images/pyspark-pictures-dataframes-page57.svg" width=500 height=500 />
</a>

In [56]:
# take
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.take(num=2)
x.show()
print(y)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

[Row(from=u'Alice', to=u'Bob', amt=0.1), Row(from=u'Bob', to=u'Carol', amt=0.2)]


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.toDF">
<img align=left src="files/images/pyspark-pictures-dataframes-page58.svg" width=500 height=500 />
</a>

In [57]:
# toDF
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.toDF("seller","buyer","amt")
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+------+-----+---+
|seller|buyer|amt|
+------+-----+---+
| Alice|  Bob|0.1|
|   Bob|Carol|0.2|
| Carol| Dave|0.3|
+------+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.toJSON">
<img align=left src="files/images/pyspark-pictures-dataframes-page59.svg" width=500 height=500 />
</a>

In [58]:
# toJSON
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Alice",0.3)], ['from','to','amt'])
y = x.toJSON()
x.show()
print(y.collect())

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol|Alice|0.3|
+-----+-----+---+

[u'{"from":"Alice","to":"Bob","amt":0.1}', u'{"from":"Bob","to":"Carol","amt":0.2}', u'{"from":"Carol","to":"Alice","amt":0.3}']


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.toPandas">
<img align=left src="files/images/pyspark-pictures-dataframes-page60.svg" width=500 height=500 />
</a>

In [59]:
# toPandas
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.toPandas()
x.show()
print(type(y))
y

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

<class 'pandas.core.frame.DataFrame'>


Unnamed: 0,from,to,amt
0,Alice,Bob,0.1
1,Bob,Carol,0.2
2,Carol,Dave,0.3


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.unionAll">
<img align=left src="files/images/pyspark-pictures-dataframes-page61.svg" width=500 height=500 />
</a>

In [60]:
# unionAll
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2)], ['from','to','amt'])
y = sqlContext.createDataFrame([("Bob","Carol",0.2),("Carol","Dave",0.1)], ['from','to','amt'])
z = x.unionAll(y)
x.show()
y.show()
z.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.2|
|Carol| Dave|0.1|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|  Bob|Carol|0.2|
|Carol| Dave|0.1|
+-----+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.unpersist">
<img align=left src="files/images/pyspark-pictures-dataframes-page62.svg" width=500 height=500 />
</a>

In [61]:
# unpersist
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
x.cache()
x.count()
x.show()
print(x.is_cached)
x.unpersist()
print(x.is_cached)

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

True
False


<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.where">
<img align=left src="files/images/pyspark-pictures-dataframes-page63.svg" width=500 height=500 />
</a>

In [62]:
# where (filter)
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.where("amt > 0.1")
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.withColumn">
<img align=left src="files/images/pyspark-pictures-dataframes-page64.svg" width=500 height=500 />
</a>

In [63]:
# withColumn
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",None),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.withColumn('conf',x.amt.isNotNull())
x.show()
y.show()

+-----+-----+----+
| from|   to| amt|
+-----+-----+----+
|Alice|  Bob| 0.1|
|  Bob|Carol|null|
|Carol| Dave| 0.3|
+-----+-----+----+

+-----+-----+----+-----+
| from|   to| amt| conf|
+-----+-----+----+-----+
|Alice|  Bob| 0.1| true|
|  Bob|Carol|null|false|
|Carol| Dave| 0.3| true|
+-----+-----+----+-----+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.withColumnRenamed">
<img align=left src="files/images/pyspark-pictures-dataframes-page65.svg" width=500 height=500 />
</a>

In [64]:
# withColumnRenamed
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.withColumnRenamed('amt','amount')
x.show()
y.show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+-----+-----+------+
| from|   to|amount|
+-----+-----+------+
|Alice|  Bob|   0.1|
|  Bob|Carol|   0.2|
|Carol| Dave|   0.3|
+-----+-----+------+



<a href="http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.write">
<img align=left src="files/images/pyspark-pictures-dataframes-page66.svg" width=500 height=500 />
</a>

In [65]:
# write
import json
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.write.mode('overwrite').json('./dataframeWriteExample.json')
x.show()
# read the dataframe back in from file
sqlContext.read.json('./dataframeWriteExample.json').show()

+-----+-----+---+
| from|   to|amt|
+-----+-----+---+
|Alice|  Bob|0.1|
|  Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+

+---+-----+-----+
|amt| from|   to|
+---+-----+-----+
|0.1|Alice|  Bob|
|0.2|  Bob|Carol|
|0.3|Carol| Dave|
+---+-----+-----+

