## INTRO
- Basics OP on Pyspark DataFrame

In [1]:
# OP 
import datetime as dt 
import time
import csv
import requests
import pandas as pd, numpy as np

# SPARK 
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
from operator import add
from pyspark.sql.functions import countDistinct, avg, stddev, format_number

In [2]:
# config 
conf = SparkConf().setAppName("LOAD PTT MYSQL DATABASE")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

## 1) Pyspark Read csv as Spark DataFrame

In [3]:
# load the data
df_boston = sqlContext.read\
 .format('com.databricks.spark.csv')\
 .options(header='true', inferschema='true')\
 .load('boston.csv')

In [4]:
type(df_boston)

pyspark.sql.dataframe.DataFrame

In [5]:
df_boston

DataFrame[CRIM: double, ZN: double, INDUS: double, CHAS: double, NOX: double, RM: double, AGE: double, DIS: double, RAD: double, TAX: double, PTRATIO: double, B: double, LSTAT: double, price: double]

In [6]:
df_boston.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: double (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: double (nullable = true)
 |-- TAX: double (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- price: double (nullable = true)



In [7]:
df_boston.columns

['CRIM',
 'ZN',
 'INDUS',
 'CHAS',
 'NOX',
 'RM',
 'AGE',
 'DIS',
 'RAD',
 'TAX',
 'PTRATIO',
 'B',
 'LSTAT',
 'price']

In [8]:
df_boston.describe().show()

+-------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+
|summary| CRIM| ZN| INDUS| CHAS| NOX| RM| AGE| DIS| RAD| TAX| PTRATIO| B| LSTAT| price|
+-------+------------------+------------------+------------------+------------------+-------------------+------------------+------------------+-----------------+-----------------+------------------+------------------+------------------+------------------+------------------+
| count| 506| 506| 506| 506| 506| 506| 506| 506| 506| 506| 506| 506| 506| 506|
| mean|3.6135235573122535|11.363636363636363|11.136778656126504|0.0691699604743083| 0.5546950592885372| 6.284634387351787| 68.57490118577078|3.795042687747034|9.549407114624506| 408.2371541501976|18.455533596837967|356.67403162055257|12.653063241106723|22.532806324110698|
| stddev| 

In [9]:
df_boston.show()

+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+
| CRIM| ZN|INDUS|CHAS| NOX| RM| AGE| DIS|RAD| TAX|PTRATIO| B|LSTAT|price|
+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+
|0.00632|18.0| 2.31| 0.0|0.538|6.575| 65.2| 4.09|1.0|296.0| 15.3| 396.9| 4.98| 24.0|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421| 78.9|4.9671|2.0|242.0| 17.8| 396.9| 9.14| 21.6|
|0.02729| 0.0| 7.07| 0.0|0.469|7.185| 61.1|4.9671|2.0|242.0| 17.8|392.83| 4.03| 34.7|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998| 45.8|6.0622|3.0|222.0| 18.7|394.63| 2.94| 33.4|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147| 54.2|6.0622|3.0|222.0| 18.7| 396.9| 5.33| 36.2|
|0.02985| 0.0| 2.18| 0.0|0.458| 6.43| 58.7|6.0622|3.0|222.0| 18.7|394.12| 5.21| 28.7|
|0.08829|12.5| 7.87| 0.0|0.524|6.012| 66.6|5.5605|5.0|311.0| 15.2| 395.6|12.43| 22.9|
|0.14455|12.5| 7.87| 0.0|0.524|6.172| 96.1|5.9505|5.0|311.0| 15.2| 396.9|19.15| 27.1|
|0.21124|12.5| 7.87| 0.0|0.524|5.631|100.0|6.0821|5.0|311.0| 15

## 2) Manually set DataFrame schema

In [10]:
from pyspark.sql.types import (StructField, StructType,
 IntegerType, StringType, LongType)

In [11]:
data_schema = [StructField('CRIM',StringType(), True )]

In [12]:
final_struc = StructType(fields= data_schema)

In [13]:
# reload the csv BUT WITH PRE-DEFINED SCHEMA AS ABOVE 
df_boston_updated = sqlContext.read\
 .format('com.databricks.spark.csv')\
 .options(header='true', inferschema='true', shema= final_struc)\
 .load('boston.csv')

In [14]:
df_boston_updated.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: double (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: double (nullable = true)
 |-- TAX: double (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- price: double (nullable = true)



## 3) Select one column from Spark DF 

In [15]:
# pandas way 
# which is not working here 
type(df_boston['CRIM'])
# df_boston['CRIM'].show() <--- not work

pyspark.sql.column.Column

In [16]:
# pyspark way 
type(df_boston.select('CRIM'))

pyspark.sql.dataframe.DataFrame

In [17]:
# SELECT 1 coluumn
df_boston.select('CRIM').show()

+-------+
| CRIM|
+-------+
|0.00632|
|0.02731|
|0.02729|
|0.03237|
|0.06905|
|0.02985|
|0.08829|
|0.14455|
|0.21124|
|0.17004|
|0.22489|
|0.11747|
|0.09378|
|0.62976|
|0.63796|
|0.62739|
|1.05393|
| 0.7842|
|0.80271|
| 0.7258|
+-------+
only showing top 20 rows



In [18]:
# SELECT multiple coluumns
df_boston.select(['CRIM','B']).show()

+-------+------+
| CRIM| B|
+-------+------+
|0.00632| 396.9|
|0.02731| 396.9|
|0.02729|392.83|
|0.03237|394.63|
|0.06905| 396.9|
|0.02985|394.12|
|0.08829| 395.6|
|0.14455| 396.9|
|0.21124|386.63|
|0.17004|386.71|
|0.22489|392.52|
|0.11747| 396.9|
|0.09378| 390.5|
|0.62976| 396.9|
|0.63796|380.02|
|0.62739|395.62|
|1.05393|386.85|
| 0.7842|386.75|
|0.80271|288.99|
| 0.7258|390.95|
+-------+------+
only showing top 20 rows



In [19]:
df_boston.head(2)

[Row(CRIM=0.00632, ZN=18.0, INDUS=2.31, CHAS=0.0, NOX=0.538, RM=6.575, AGE=65.2, DIS=4.09, RAD=1.0, TAX=296.0, PTRATIO=15.3, B=396.9, LSTAT=4.98, price=24.0),
 Row(CRIM=0.02731, ZN=0.0, INDUS=7.07, CHAS=0.0, NOX=0.469, RM=6.421, AGE=78.9, DIS=4.9671, RAD=2.0, TAX=242.0, PTRATIO=17.8, B=396.9, LSTAT=9.14, price=21.6)]

In [20]:
df_boston.head(2)[0]

Row(CRIM=0.00632, ZN=18.0, INDUS=2.31, CHAS=0.0, NOX=0.538, RM=6.575, AGE=65.2, DIS=4.09, RAD=1.0, TAX=296.0, PTRATIO=15.3, B=396.9, LSTAT=4.98, price=24.0)

### 4) Create a new column

In [21]:
# add new column : colX
df_boston.withColumn('colX', df_boston['B']).show()

+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+------+
| CRIM| ZN|INDUS|CHAS| NOX| RM| AGE| DIS|RAD| TAX|PTRATIO| B|LSTAT|price| colX|
+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+------+
|0.00632|18.0| 2.31| 0.0|0.538|6.575| 65.2| 4.09|1.0|296.0| 15.3| 396.9| 4.98| 24.0| 396.9|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421| 78.9|4.9671|2.0|242.0| 17.8| 396.9| 9.14| 21.6| 396.9|
|0.02729| 0.0| 7.07| 0.0|0.469|7.185| 61.1|4.9671|2.0|242.0| 17.8|392.83| 4.03| 34.7|392.83|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998| 45.8|6.0622|3.0|222.0| 18.7|394.63| 2.94| 33.4|394.63|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147| 54.2|6.0622|3.0|222.0| 18.7| 396.9| 5.33| 36.2| 396.9|
|0.02985| 0.0| 2.18| 0.0|0.458| 6.43| 58.7|6.0622|3.0|222.0| 18.7|394.12| 5.21| 28.7|394.12|
|0.08829|12.5| 7.87| 0.0|0.524|6.012| 66.6|5.5605|5.0|311.0| 15.2| 395.6|12.43| 22.9| 395.6|
|0.14455|12.5| 7.87| 0.0|0.524|6.172| 96.1|5.9505|5.0|311.0| 15.2| 396.9|19.15| 

In [22]:
# add new column : colY
df_boston.withColumn('colY', df_boston['B']*2).show()

+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+------+
| CRIM| ZN|INDUS|CHAS| NOX| RM| AGE| DIS|RAD| TAX|PTRATIO| B|LSTAT|price| colY|
+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+------+
|0.00632|18.0| 2.31| 0.0|0.538|6.575| 65.2| 4.09|1.0|296.0| 15.3| 396.9| 4.98| 24.0| 793.8|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421| 78.9|4.9671|2.0|242.0| 17.8| 396.9| 9.14| 21.6| 793.8|
|0.02729| 0.0| 7.07| 0.0|0.469|7.185| 61.1|4.9671|2.0|242.0| 17.8|392.83| 4.03| 34.7|785.66|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998| 45.8|6.0622|3.0|222.0| 18.7|394.63| 2.94| 33.4|789.26|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147| 54.2|6.0622|3.0|222.0| 18.7| 396.9| 5.33| 36.2| 793.8|
|0.02985| 0.0| 2.18| 0.0|0.458| 6.43| 58.7|6.0622|3.0|222.0| 18.7|394.12| 5.21| 28.7|788.24|
|0.08829|12.5| 7.87| 0.0|0.524|6.012| 66.6|5.5605|5.0|311.0| 15.2| 395.6|12.43| 22.9| 791.2|
|0.14455|12.5| 7.87| 0.0|0.524|6.172| 96.1|5.9505|5.0|311.0| 15.2| 396.9|19.15| 

## 5) Rename columns 

In [23]:
# rename column "B" -> "BBB"
df_boston.withColumnRenamed('B', 'BBB').show()

+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+
| CRIM| ZN|INDUS|CHAS| NOX| RM| AGE| DIS|RAD| TAX|PTRATIO| BBB|LSTAT|price|
+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+
|0.00632|18.0| 2.31| 0.0|0.538|6.575| 65.2| 4.09|1.0|296.0| 15.3| 396.9| 4.98| 24.0|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421| 78.9|4.9671|2.0|242.0| 17.8| 396.9| 9.14| 21.6|
|0.02729| 0.0| 7.07| 0.0|0.469|7.185| 61.1|4.9671|2.0|242.0| 17.8|392.83| 4.03| 34.7|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998| 45.8|6.0622|3.0|222.0| 18.7|394.63| 2.94| 33.4|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147| 54.2|6.0622|3.0|222.0| 18.7| 396.9| 5.33| 36.2|
|0.02985| 0.0| 2.18| 0.0|0.458| 6.43| 58.7|6.0622|3.0|222.0| 18.7|394.12| 5.21| 28.7|
|0.08829|12.5| 7.87| 0.0|0.524|6.012| 66.6|5.5605|5.0|311.0| 15.2| 395.6|12.43| 22.9|
|0.14455|12.5| 7.87| 0.0|0.524|6.172| 96.1|5.9505|5.0|311.0| 15.2| 396.9|19.15| 27.1|
|0.21124|12.5| 7.87| 0.0|0.524|5.631|100.0|6.0821|5.0|311.0| 

## 6) Pyspark SQL

In [24]:
# create a temp SQL view from df 
df_boston.createOrReplaceTempView('BOSTON')

In [25]:
result=sqlContext.sql("select B from BOSTON LIMIT 10").show()

+------+
| B|
+------+
| 396.9|
| 396.9|
|392.83|
|394.63|
| 396.9|
|394.12|
| 395.6|
| 396.9|
|386.63|
|386.71|
+------+



## 7) Filter data

In [26]:
result2=df_boston.filter("B < 50").show()

+-------+---+-----+----+-----+-----+-----+------+----+-----+-------+-----+-----+-----+
| CRIM| ZN|INDUS|CHAS| NOX| RM| AGE| DIS| RAD| TAX|PTRATIO| B|LSTAT|price|
+-------+---+-----+----+-----+-----+-----+------+----+-----+-------+-----+-----+-----+
|51.1358|0.0| 18.1| 0.0|0.597|5.757|100.0| 1.413|24.0|666.0| 20.2| 2.6|10.11| 15.0|
|14.0507|0.0| 18.1| 0.0|0.597|6.657|100.0|1.5275|24.0|666.0| 20.2|35.05|21.22| 17.2|
| 18.811|0.0| 18.1| 0.0|0.597|4.628|100.0|1.5539|24.0|666.0| 20.2|28.79|34.37| 17.9|
|18.0846|0.0| 18.1| 0.0|0.679|6.434|100.0|1.8347|24.0|666.0| 20.2|27.25|29.05| 7.2|
|10.8342|0.0| 18.1| 0.0|0.679|6.782| 90.8|1.8195|24.0|666.0| 20.2|21.57|25.79| 7.5|
|73.5341|0.0| 18.1| 0.0|0.679|5.957|100.0|1.8026|24.0|666.0| 20.2|16.45|20.62| 8.8|
|11.8123|0.0| 18.1| 0.0|0.718|6.824| 76.5| 1.794|24.0|666.0| 20.2|48.45|22.74| 8.4|
|7.05042|0.0| 18.1| 0.0|0.614|6.103| 85.1|2.0218|24.0|666.0| 20.2| 2.52|23.29| 13.4|
|8.79212|0.0| 18.1| 0.0|0.584|5.565| 70.6|2.0635|24.0|666.0| 20.2| 3.65|17.1

In [27]:
result2=df_boston.filter("B < 50").select(['B']).show()

+-----+
| B|
+-----+
| 2.6|
|35.05|
|28.79|
|27.25|
|21.57|
|16.45|
|48.45|
| 2.52|
| 3.65|
| 7.68|
|24.65|
|18.82|
|27.49|
| 9.32|
|43.06|
| 0.32|
| 6.68|
|10.48|
| 3.5|
|22.01|
+-----+



In [28]:
# python way 
result2=df_boston.filter(df_boston['B']<50).show()

+-------+---+-----+----+-----+-----+-----+------+----+-----+-------+-----+-----+-----+
| CRIM| ZN|INDUS|CHAS| NOX| RM| AGE| DIS| RAD| TAX|PTRATIO| B|LSTAT|price|
+-------+---+-----+----+-----+-----+-----+------+----+-----+-------+-----+-----+-----+
|51.1358|0.0| 18.1| 0.0|0.597|5.757|100.0| 1.413|24.0|666.0| 20.2| 2.6|10.11| 15.0|
|14.0507|0.0| 18.1| 0.0|0.597|6.657|100.0|1.5275|24.0|666.0| 20.2|35.05|21.22| 17.2|
| 18.811|0.0| 18.1| 0.0|0.597|4.628|100.0|1.5539|24.0|666.0| 20.2|28.79|34.37| 17.9|
|18.0846|0.0| 18.1| 0.0|0.679|6.434|100.0|1.8347|24.0|666.0| 20.2|27.25|29.05| 7.2|
|10.8342|0.0| 18.1| 0.0|0.679|6.782| 90.8|1.8195|24.0|666.0| 20.2|21.57|25.79| 7.5|
|73.5341|0.0| 18.1| 0.0|0.679|5.957|100.0|1.8026|24.0|666.0| 20.2|16.45|20.62| 8.8|
|11.8123|0.0| 18.1| 0.0|0.718|6.824| 76.5| 1.794|24.0|666.0| 20.2|48.45|22.74| 8.4|
|7.05042|0.0| 18.1| 0.0|0.614|6.103| 85.1|2.0218|24.0|666.0| 20.2| 2.52|23.29| 13.4|
|8.79212|0.0| 18.1| 0.0|0.584|5.565| 70.6|2.0635|24.0|666.0| 20.2| 3.65|17.1

In [29]:
##### filter in multiple conditions #####

df_boston.filter((df_boston['B'] < 50 ) & (df_boston['B'] > 10 ) ).show()

+-------+---+-----+----+-----+-----+-----+------+----+-----+-------+-----+-----+-----+
| CRIM| ZN|INDUS|CHAS| NOX| RM| AGE| DIS| RAD| TAX|PTRATIO| B|LSTAT|price|
+-------+---+-----+----+-----+-----+-----+------+----+-----+-------+-----+-----+-----+
|14.0507|0.0| 18.1| 0.0|0.597|6.657|100.0|1.5275|24.0|666.0| 20.2|35.05|21.22| 17.2|
| 18.811|0.0| 18.1| 0.0|0.597|4.628|100.0|1.5539|24.0|666.0| 20.2|28.79|34.37| 17.9|
|18.0846|0.0| 18.1| 0.0|0.679|6.434|100.0|1.8347|24.0|666.0| 20.2|27.25|29.05| 7.2|
|10.8342|0.0| 18.1| 0.0|0.679|6.782| 90.8|1.8195|24.0|666.0| 20.2|21.57|25.79| 7.5|
|73.5341|0.0| 18.1| 0.0|0.679|5.957|100.0|1.8026|24.0|666.0| 20.2|16.45|20.62| 8.8|
|11.8123|0.0| 18.1| 0.0|0.718|6.824| 76.5| 1.794|24.0|666.0| 20.2|48.45|22.74| 8.4|
|12.2472|0.0| 18.1| 0.0|0.584|5.837| 59.7|1.9976|24.0|666.0| 20.2|24.65|15.69| 10.2|
|37.6619|0.0| 18.1| 0.0|0.679|6.202| 78.7|1.8629|24.0|666.0| 20.2|18.82|14.52| 10.9|
|14.4208|0.0| 18.1| 0.0| 0.74|6.461| 93.3|2.0026|24.0|666.0| 20.2|27.49|18.

## 8) Get filter spark data as dict, array.. and process on them

In [30]:
filer_result=df_boston.filter((df_boston['B'] < 50 ) & (df_boston['B'] > 10 ) ).collect()

In [31]:
filer_result

[Row(CRIM=14.0507, ZN=0.0, INDUS=18.1, CHAS=0.0, NOX=0.597, RM=6.657, AGE=100.0, DIS=1.5275, RAD=24.0, TAX=666.0, PTRATIO=20.2, B=35.05, LSTAT=21.22, price=17.2),
 Row(CRIM=18.811, ZN=0.0, INDUS=18.1, CHAS=0.0, NOX=0.597, RM=4.628, AGE=100.0, DIS=1.5539, RAD=24.0, TAX=666.0, PTRATIO=20.2, B=28.79, LSTAT=34.37, price=17.9),
 Row(CRIM=18.0846, ZN=0.0, INDUS=18.1, CHAS=0.0, NOX=0.679, RM=6.434, AGE=100.0, DIS=1.8347, RAD=24.0, TAX=666.0, PTRATIO=20.2, B=27.25, LSTAT=29.05, price=7.2),
 Row(CRIM=10.8342, ZN=0.0, INDUS=18.1, CHAS=0.0, NOX=0.679, RM=6.782, AGE=90.8, DIS=1.8195, RAD=24.0, TAX=666.0, PTRATIO=20.2, B=21.57, LSTAT=25.79, price=7.5),
 Row(CRIM=73.5341, ZN=0.0, INDUS=18.1, CHAS=0.0, NOX=0.679, RM=5.957, AGE=100.0, DIS=1.8026, RAD=24.0, TAX=666.0, PTRATIO=20.2, B=16.45, LSTAT=20.62, price=8.8),
 Row(CRIM=11.8123, ZN=0.0, INDUS=18.1, CHAS=0.0, NOX=0.718, RM=6.824, AGE=76.5, DIS=1.794, RAD=24.0, TAX=666.0, PTRATIO=20.2, B=48.45, LSTAT=22.74, price=8.4),
 Row(CRIM=12.2472, ZN=0.0, IND

In [32]:
type(filer_result)

list

In [33]:
filer_result[0]

Row(CRIM=14.0507, ZN=0.0, INDUS=18.1, CHAS=0.0, NOX=0.597, RM=6.657, AGE=100.0, DIS=1.5275, RAD=24.0, TAX=666.0, PTRATIO=20.2, B=35.05, LSTAT=21.22, price=17.2)

In [34]:
filer_result[0].asDict()

{'AGE': 100.0,
 'B': 35.05,
 'CHAS': 0.0,
 'CRIM': 14.0507,
 'DIS': 1.5275,
 'INDUS': 18.1,
 'LSTAT': 21.22,
 'NOX': 0.597,
 'PTRATIO': 20.2,
 'RAD': 24.0,
 'RM': 6.657,
 'TAX': 666.0,
 'ZN': 0.0,
 'price': 17.2}

In [35]:
filer_result[0].asDict()['AGE']

100.0

In [36]:
# END OF COURSE 8.26 
# next 8.27

## 9) Spark df groupby 

In [37]:
df_boston.groupby("RAD")



In [38]:
df_boston.groupby("RAD").mean()

DataFrame[RAD: double, avg(CRIM): double, avg(ZN): double, avg(INDUS): double, avg(CHAS): double, avg(NOX): double, avg(RM): double, avg(AGE): double, avg(DIS): double, avg(RAD): double, avg(TAX): double, avg(PTRATIO): double, avg(B): double, avg(LSTAT): double, avg(price): double]

In [39]:
df_boston.groupby("RAD").mean().show()

+----+--------------------+------------------+------------------+-------------------+-------------------+------------------+-----------------+------------------+--------+------------------+------------------+------------------+------------------+------------------+
| RAD| avg(CRIM)| avg(ZN)| avg(INDUS)| avg(CHAS)| avg(NOX)| avg(RM)| avg(AGE)| avg(DIS)|avg(RAD)| avg(TAX)| avg(PTRATIO)| avg(B)| avg(LSTAT)| avg(price)|
+----+--------------------+------------------+------------------+-------------------+-------------------+------------------+-----------------+------------------+--------+------------------+------------------+------------------+------------------+------------------+
| 8.0| 0.3714095833333333| 6.25| 5.9325|0.20833333333333334|0.49249999999999977|6.9507916666666665| 67.35| 4.410604166666666| 8.0| 301.25| 17.97499999999999| 385.2775|7.9608333333333325|30.358333333333334|
| 7.0| 0.15039941176470584|26.705882352941178| 5.034705882352943| 0.0| 0.4409999999999999| 6.647764705882355

In [40]:
# agg 
df_boston.agg({'RAD':'mean'}).show()

+-----------------+
| avg(RAD)|
+-----------------+
|9.549407114624506|
+-----------------+



In [41]:
# group + agg 
group_data = df_boston.groupby('RAD')
group_data.agg({'ZN': 'max'}).show()

+----+-------+
| RAD|max(ZN)|
+----+-------+
| 8.0| 25.0|
| 7.0| 34.0|
| 1.0| 90.0|
| 4.0| 95.0|
| 3.0| 95.0|
| 2.0| 85.0|
| 6.0| 52.5|
| 5.0| 100.0|
|24.0| 0.0|
+----+-------+



In [42]:
# group + agg for loop -- general case 

group_data = df_boston.groupby('RAD')
for col in ['NOX','RM','AGE']:
 group_data.agg({col: 'max'}).show()

+----+--------+
| RAD|max(NOX)|
+----+--------+
| 8.0| 0.507|
| 7.0| 0.472|
| 1.0| 0.573|
| 4.0| 0.624|
| 3.0| 0.488|
| 2.0| 0.581|
| 6.0| 0.585|
| 5.0| 0.871|
|24.0| 0.77|
+----+--------+

+----+-------+
| RAD|max(RM)|
+----+-------+
| 8.0| 8.725|
| 7.0| 8.259|
| 1.0| 7.923|
| 4.0| 8.034|
| 3.0| 7.831|
| 2.0| 8.069|
| 6.0| 6.897|
| 5.0| 8.704|
|24.0| 8.78|
+----+-------+

+----+--------+
| RAD|max(AGE)|
+----+--------+
| 8.0| 93.4|
| 7.0| 79.2|
| 1.0| 91.0|
| 4.0| 100.0|
| 3.0| 95.6|
| 2.0| 97.0|
| 6.0| 95.4|
| 5.0| 100.0|
|24.0| 100.0|
+----+--------+



In [43]:
# import spark default func : countDistinct, avg, stddev

df_boston.select(countDistinct('CRIM')).show()

+--------------------+
|count(DISTINCT CRIM)|
+--------------------+
| 504|
+--------------------+



In [44]:
df_boston.select(avg('CRIM')).show()

# compare with the groupby method 
# df_boston.agg({'CRIM':'mean'}).show()

+------------------+
| avg(CRIM)|
+------------------+
|3.6135235573122535|
+------------------+



In [45]:
# rename the result col 
df_boston.select(countDistinct('CRIM').alias('AVG_CRIM')).show()

+--------+
|AVG_CRIM|
+--------+
| 504|
+--------+



In [46]:
# format result digits 

crim_std = df_boston.select(stddev('CRIM').alias('std'))

print (crim_std.show())

print (crim_std.select(format_number('std',2)).show())

+-----------------+
| std|
+-----------------+
|8.601545105332491|
+-----------------+

None
+---------------------+
|format_number(std, 2)|
+---------------------+
| 8.60|
+---------------------+

None


In [47]:
# order by 

df_boston.orderBy('CRIM').show()

+-------+-----+-----+----+------+-----+----+-------+---+-----+-------+------+-----+-----+
| CRIM| ZN|INDUS|CHAS| NOX| RM| AGE| DIS|RAD| TAX|PTRATIO| B|LSTAT|price|
+-------+-----+-----+----+------+-----+----+-------+---+-----+-------+------+-----+-----+
|0.00632| 18.0| 2.31| 0.0| 0.538|6.575|65.2| 4.09|1.0|296.0| 15.3| 396.9| 4.98| 24.0|
|0.00906| 90.0| 2.97| 0.0| 0.4|7.088|20.8| 7.3073|1.0|285.0| 15.3|394.72| 7.85| 32.2|
|0.01096| 55.0| 2.25| 0.0| 0.389|6.453|31.9| 7.3073|1.0|300.0| 15.3|394.72| 8.23| 22.0|
|0.01301| 35.0| 1.52| 0.0| 0.442|7.241|49.3| 7.0379|1.0|284.0| 15.5|394.74| 5.49| 32.7|
|0.01311| 90.0| 1.22| 0.0| 0.403|7.249|21.9| 8.6966|5.0|226.0| 17.9|395.93| 4.81| 35.4|
| 0.0136| 75.0| 4.0| 0.0| 0.41|5.888|47.6| 7.3197|3.0|469.0| 21.1| 396.9| 14.8| 18.9|
|0.01381| 80.0| 0.46| 0.0| 0.422|7.875|32.0| 5.6484|4.0|255.0| 14.4|394.23| 2.97| 50.0|
|0.01432|100.0| 1.32| 0.0| 0.411|6.816|40.5| 8.3248|5.0|256.0| 15.1| 392.9| 3.95| 31.6|
|0.01439| 60.0| 2.93| 0.0| 0.401|6.604|18.8| 6.2

In [48]:
# order by : inverse order 

df_boston.orderBy(df_boston['CRIM'].desc()).show()

+-------+---+-----+----+-----+-----+-----+------+----+-----+-------+------+-----+-----+
| CRIM| ZN|INDUS|CHAS| NOX| RM| AGE| DIS| RAD| TAX|PTRATIO| B|LSTAT|price|
+-------+---+-----+----+-----+-----+-----+------+----+-----+-------+------+-----+-----+
|88.9762|0.0| 18.1| 0.0|0.671|6.968| 91.9|1.4165|24.0|666.0| 20.2| 396.9|17.21| 10.4|
|73.5341|0.0| 18.1| 0.0|0.679|5.957|100.0|1.8026|24.0|666.0| 20.2| 16.45|20.62| 8.8|
|67.9208|0.0| 18.1| 0.0|0.693|5.683|100.0|1.4254|24.0|666.0| 20.2|384.97|22.98| 5.0|
|51.1358|0.0| 18.1| 0.0|0.597|5.757|100.0| 1.413|24.0|666.0| 20.2| 2.6|10.11| 15.0|
|45.7461|0.0| 18.1| 0.0|0.693|4.519|100.0|1.6582|24.0|666.0| 20.2| 88.27|36.98| 7.0|
|41.5292|0.0| 18.1| 0.0|0.693|5.531| 85.4|1.6074|24.0|666.0| 20.2|329.46|27.38| 8.5|
|38.3518|0.0| 18.1| 0.0|0.693|5.453|100.0|1.4896|24.0|666.0| 20.2| 396.9|30.59| 5.0|
|37.6619|0.0| 18.1| 0.0|0.679|6.202| 78.7|1.8629|24.0|666.0| 20.2| 18.82|14.52| 10.9|
|28.6558|0.0| 18.1| 0.0|0.597|5.155|100.0|1.5894|24.0|666.0| 20.2|21

## 10) Missing data 

In [49]:
# drop null data 
df_boston.na.drop().show()

# only drop when all column data is null 
#df_boston.na.drop(how='all').show()

# only drop if specfic column data is null 
#df_boston.na.drop(subset = ['CRIM']).show()


# fill null data 
#df_boston.na.fill('FILLED VALUE').show()


# fill with avg value
# from pyspark.sql.functions import mean 
# mean_val = df_boston.select(mean(df['CRIM'])).collect()
# mean_val_ = mean_val[0][0]
#df_boston.na.fill(mean_val_, ['CRIM']).show()




+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+
| CRIM| ZN|INDUS|CHAS| NOX| RM| AGE| DIS|RAD| TAX|PTRATIO| B|LSTAT|price|
+-------+----+-----+----+-----+-----+-----+------+---+-----+-------+------+-----+-----+
|0.00632|18.0| 2.31| 0.0|0.538|6.575| 65.2| 4.09|1.0|296.0| 15.3| 396.9| 4.98| 24.0|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421| 78.9|4.9671|2.0|242.0| 17.8| 396.9| 9.14| 21.6|
|0.02729| 0.0| 7.07| 0.0|0.469|7.185| 61.1|4.9671|2.0|242.0| 17.8|392.83| 4.03| 34.7|
|0.03237| 0.0| 2.18| 0.0|0.458|6.998| 45.8|6.0622|3.0|222.0| 18.7|394.63| 2.94| 33.4|
|0.06905| 0.0| 2.18| 0.0|0.458|7.147| 54.2|6.0622|3.0|222.0| 18.7| 396.9| 5.33| 36.2|
|0.02985| 0.0| 2.18| 0.0|0.458| 6.43| 58.7|6.0622|3.0|222.0| 18.7|394.12| 5.21| 28.7|
|0.08829|12.5| 7.87| 0.0|0.524|6.012| 66.6|5.5605|5.0|311.0| 15.2| 395.6|12.43| 22.9|
|0.14455|12.5| 7.87| 0.0|0.524|6.172| 96.1|5.9505|5.0|311.0| 15.2| 396.9|19.15| 27.1|
|0.21124|12.5| 7.87| 0.0|0.524|5.631|100.0|6.0821|5.0|311.0| 15

## 11) Dates and Timestamp 

In [50]:
# load timestamp csv 

# load the data
df_timestamp = sqlContext.read\
 .format('com.databricks.spark.csv')\
 .options(header='true', inferschema='true')\
 .load('df_test.csv')
 
# rename column "_c0" -> "time"
df_timestamp = df_timestamp.withColumnRenamed('_c0', 'time')

In [52]:
df_timestamp.show()

+-------------------+---------------+---------------+---------------+-----------------+
| time| A| B| C| D|
+-------------------+---------------+---------------+---------------+-----------------+
|2013-01-01 00:00:00| 1.74284507345| 0.827612028809|-0.806632282957| 0.306776368559|
|2013-01-02 00:00:00| 1.33922469984|-0.795165583739| -1.77473024596| 0.437705419137|
|2013-01-03 00:00:00| 0.652848767373| -1.54077088745| -1.41021709968| 1.70035456582|
|2013-01-04 00:00:00| 0.247396585844| -1.30234054482|-0.846092769624| 0.699764788482|
|2013-01-05 00:00:00|-0.740530243845| 0.713124246753| -1.16320459805|-0.00284413632768|
|2013-01-06 00:00:00| -1.17173754177| 0.560340996287| 0.946558700023| -0.615189047933|
+-------------------+---------------+---------------+---------------+-----------------+



In [53]:
# import timestamp OP methods 
from pyspark.sql.functions import (dayofmonth,
 hour,
 dayofyear,
 month, 
 year,
 weekofyear,
 format_number,
 date_format)

In [54]:
# transform timestemp via timestamp OP methods 

df_timestamp.select(dayofmonth(df_timestamp['time']),
 month(df_timestamp['time']),
 year(df_timestamp['time']),
 (df_timestamp['time'])).show()

+----------------+-----------+----------+-------------------+
|dayofmonth(time)|month(time)|year(time)| time|
+----------------+-----------+----------+-------------------+
| 1| 1| 2013|2013-01-01 00:00:00|
| 2| 1| 2013|2013-01-02 00:00:00|
| 3| 1| 2013|2013-01-03 00:00:00|
| 4| 1| 2013|2013-01-04 00:00:00|
| 5| 1| 2013|2013-01-05 00:00:00|
| 6| 1| 2013|2013-01-06 00:00:00|
+----------------+-----------+----------+-------------------+



In [55]:
# get avg value per year 
df_timestamp_new = df_timestamp.withColumn('year', year(df_timestamp['time']))
df_timestamp_new.groupby('year').mean().show()

+----+------------------+--------------+-------------------+------------------+---------+
|year| avg(A)| avg(B)| avg(C)| avg(D)|avg(year)|
+----+------------------+--------------+-------------------+------------------+---------+
|2013|0.3450078901486666|-0.25619995736|-0.8423863827079999|0.4210946596228866| 2013.0|
+----+------------------+--------------+-------------------+------------------+---------+



In [56]:
# only select the needed columns 
df_timestamp_new.groupby('year').mean().select(['year','avg(A)']).show()

+----+------------------+
|year| avg(A)|
+----+------------------+
|2013|0.3450078901486666|
+----+------------------+



In [57]:
# format the outcome 
result = df_timestamp_new.groupby('year').mean().select(['year','avg(A)'])
new_result = result.withColumnRenamed("avg(A)", "Average A value")
new_result.select(format_number("Average A value", 2)).alias('Average A value').show()


new_result.show()

+---------------------------------+
|format_number(Average A value, 2)|
+---------------------------------+
| 0.35|
+---------------------------------+

+----+------------------+
|year| Average A value|
+----+------------------+
|2013|0.3450078901486666|
+----+------------------+



In [58]:
# end of Section 8 
# next : Section 9 