In [1]:
import os, sys, glob, datetime

# specify spark version, python version
spark_home = "/home/zero/spark-2.4.0-bin-hadoop2.7" # MODIFY THIS
python_path="/apps/anaconda3/bin/python"
# set environment variables
os.environ['SPARK_HOME'] = spark_home
os.environ['PYSPARK_PYTHON'] = python_path
os.environ['SPARK_LOCAL_IP'] = "127.0.0.1"

def setup_spark_env(app_name):
 # set environment variables
 spark_python = os.path.join(spark_home, 'python')
 py4j = glob.glob(os.path.join(spark_python, 'lib', 'py4j-*.zip'))[0]
 sys.path[:0] = [spark_python, py4j]
 # specify Spark application parameters
 PYSPARK_SUBMIT_ARGS="--master local[2]"

 os.environ['PYSPARK_SUBMIT_ARGS'] = (PYSPARK_SUBMIT_ARGS 
 + " --name '%s_%s'"%(app_name, datetime.datetime.now().strftime("%Y%m%d %H:%M")) 
 + " pyspark-shell") 
 return

#
setup_spark_env("your_spark_process_name") # MODIFY THIS
# launching PySpark application
# execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))
filename=os.path.join(spark_home, 'python/pyspark/shell.py')
exec(compile(open(filename, "rb").read(), filename, 'exec'))
sc.setLogLevel('ERROR')
print("{}".format(sc.applicationId))

Welcome to
 ____ __
 / __/__ ___ _____/ /__
 _\ \/ _ \/ _ `/ __/ '_/
 /__ / .__/\_,_/_/ /_/\_\ version 2.4.0
 /_/

Using Python version 3.6.4 (default, Jan 16 2018 18:10:19)
SparkSession available as 'spark'.
local-1557023299182


In [2]:
from pyspark.sql import functions as sf
from pyspark.sql import Row
from pyspark.sql.types import *
import numpy as np

In [3]:
import os, math, subprocess
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# some settings for displaying Pandas results
pd.set_option('display.width', 2000)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.precision', 4)
pd.set_option('display.max_colwidth', -1)

In [4]:
# load data
data_path = "home-credit-default-risk/application_train.csv"
df_data = sqlContext.read.format("csv").option("header", "true").load(data_path)
df_data.printSchema()
print("Rows: {}, Cols: {}".format(df_data.count(), len(df_data.columns)))

root
 |-- SK_ID_CURR: string (nullable = true)
 |-- TARGET: string (nullable = true)
 |-- NAME_CONTRACT_TYPE: string (nullable = true)
 |-- CODE_GENDER: string (nullable = true)
 |-- FLAG_OWN_CAR: string (nullable = true)
 |-- FLAG_OWN_REALTY: string (nullable = true)
 |-- CNT_CHILDREN: string (nullable = true)
 |-- AMT_INCOME_TOTAL: string (nullable = true)
 |-- AMT_CREDIT: string (nullable = true)
 |-- AMT_ANNUITY: string (nullable = true)
 |-- AMT_GOODS_PRICE: string (nullable = true)
 |-- NAME_TYPE_SUITE: string (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- NAME_FAMILY_STATUS: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- REGION_POPULATION_RELATIVE: string (nullable = true)
 |-- DAYS_BIRTH: string (nullable = true)
 |-- DAYS_EMPLOYED: string (nullable = true)
 |-- DAYS_REGISTRATION: string (nullable = true)
 |-- DAYS_ID_PUBLISH: string (nullable = true)
 |-- OWN_CAR_AGE: str

# SELECT, WHERE, DISTINCT, LIMIT

In [5]:
print("""
 SELECT *
 FROM pdf_data
 LIMIT 3
""")
df_data.limit(3).take(1)


 SELECT *
 FROM pdf_data
 LIMIT 3



[Row(SK_ID_CURR='100002', TARGET='1', NAME_CONTRACT_TYPE='Cash loans', CODE_GENDER='M', FLAG_OWN_CAR='N', FLAG_OWN_REALTY='Y', CNT_CHILDREN='0', AMT_INCOME_TOTAL='202500.0', AMT_CREDIT='406597.5', AMT_ANNUITY='24700.5', AMT_GOODS_PRICE='351000.0', NAME_TYPE_SUITE='Unaccompanied', NAME_INCOME_TYPE='Working', NAME_EDUCATION_TYPE='Secondary / secondary special', NAME_FAMILY_STATUS='Single / not married', NAME_HOUSING_TYPE='House / apartment', REGION_POPULATION_RELATIVE='0.018801', DAYS_BIRTH='-9461', DAYS_EMPLOYED='-637', DAYS_REGISTRATION='-3648.0', DAYS_ID_PUBLISH='-2120', OWN_CAR_AGE=None, FLAG_MOBIL='1', FLAG_EMP_PHONE='1', FLAG_WORK_PHONE='0', FLAG_CONT_MOBILE='1', FLAG_PHONE='1', FLAG_EMAIL='0', OCCUPATION_TYPE='Laborers', CNT_FAM_MEMBERS='1.0', REGION_RATING_CLIENT='2', REGION_RATING_CLIENT_W_CITY='2', WEEKDAY_APPR_PROCESS_START='WEDNESDAY', HOUR_APPR_PROCESS_START='10', REG_REGION_NOT_LIVE_REGION='0', REG_REGION_NOT_WORK_REGION='0', LIVE_REGION_NOT_WORK_REGION='0', REG_CITY_NOT_LI

In [6]:
print("""
 SELECT NAME_CONTRACT_TYPE
 FROM pdf_data
 WHERE CODE_GENDER = 'M'
""")
df_data.where("CODE_GENDER = 'M'").select("NAME_CONTRACT_TYPE").show(5)


 SELECT NAME_CONTRACT_TYPE
 FROM pdf_data
 WHERE CODE_GENDER = 'M'

+------------------+
|NAME_CONTRACT_TYPE|
+------------------+
| Cash loans|
| Revolving loans|
| Cash loans|
| Cash loans|
| Cash loans|
+------------------+
only showing top 5 rows



In [7]:
print("""
 SELECT DISTINCT NAME_CONTRACT_TYPE
 FROM pdf_data
""")
df_data.select("NAME_CONTRACT_TYPE").distinct().show(5)


 SELECT DISTINCT NAME_CONTRACT_TYPE
 FROM pdf_data

+------------------+
|NAME_CONTRACT_TYPE|
+------------------+
| Revolving loans|
| Cash loans|
+------------------+



# SELECT with multiple conditions

In [8]:
print("""
 SELECT NAME_INCOME_TYPE, CODE_GENDER, AMT_INCOME_TOTAL
 FROM pdf_data
 WHERE CODE_GENDER = 'M' AND AMT_INCOME_TOTAL > 200000.0
""")

(df_data.where("CODE_GENDER = 'M' and AMT_INCOME_TOTAL > 200000.0")
 .select("NAME_INCOME_TYPE", "CODE_GENDER", "AMT_INCOME_TOTAL")).show(5)


 SELECT NAME_INCOME_TYPE, CODE_GENDER, AMT_INCOME_TOTAL
 FROM pdf_data
 WHERE CODE_GENDER = 'M' AND AMT_INCOME_TOTAL > 200000.0

+--------------------+-----------+----------------+
| NAME_INCOME_TYPE|CODE_GENDER|AMT_INCOME_TOTAL|
+--------------------+-----------+----------------+
| Working| M| 202500.0|
| State servant| M| 360000.0|
| Working| M| 225000.0|
| State servant| M| 270000.0|
|Commercial associate| M| 360000.0|
+--------------------+-----------+----------------+
only showing top 5 rows



# ORDER BY

In [9]:
print("""
 SELECT NAME_INCOME_TYPE, AMT_INCOME_TOTAL
 FROM pdf_data
 ORDER BY AMT_INCOME_TOTAL
""")
df_data.select("NAME_INCOME_TYPE", "AMT_INCOME_TOTAL").orderBy(["AMT_INCOME_TOTAL"]).show(5)


 SELECT NAME_INCOME_TYPE, AMT_INCOME_TOTAL
 FROM pdf_data
 ORDER BY AMT_INCOME_TOTAL

+--------------------+----------------+
| NAME_INCOME_TYPE|AMT_INCOME_TOTAL|
+--------------------+----------------+
| Working| 100071.0|
|Commercial associate| 100089.0|
| Working| 100125.0|
|Commercial associate| 1001826.0|
| State servant| 100278.0|
+--------------------+----------------+
only showing top 5 rows



In [10]:
print("""
 SELECT NAME_INCOME_TYPE, AMT_INCOME_TOTAL
 FROM pdf_data
 ORDER BY AMT_INCOME_TOTAL DESC
""")
df_data.select("NAME_INCOME_TYPE", "AMT_INCOME_TOTAL").orderBy(["AMT_INCOME_TOTAL"], ascending=[0]).show(5)


 SELECT NAME_INCOME_TYPE, AMT_INCOME_TOTAL
 FROM pdf_data
 ORDER BY AMT_INCOME_TOTAL DESC

+----------------+----------------+
|NAME_INCOME_TYPE|AMT_INCOME_TOTAL|
+----------------+----------------+
| Pensioner| 99900.0|
| Pensioner| 99900.0|
| Pensioner| 99900.0|
| Pensioner| 99900.0|
| Pensioner| 99900.0|
+----------------+----------------+
only showing top 5 rows



# IN… NOT IN

In [11]:
print("""
 SELECT NAME_INCOME_TYPE, AMT_INCOME_TOTAL
 FROM pdf_data
 WHERE SK_ID_CURR IN (100002, 100010, 100011)
""")

(df_data.select("NAME_INCOME_TYPE", "AMT_INCOME_TOTAL")
 .where("SK_ID_CURR IN {}".format((100002, 100010, 100011)))).show()


 SELECT NAME_INCOME_TYPE, AMT_INCOME_TOTAL
 FROM pdf_data
 WHERE SK_ID_CURR IN (100002, 100010, 100011)

+----------------+----------------+
|NAME_INCOME_TYPE|AMT_INCOME_TOTAL|
+----------------+----------------+
| Working| 202500.0|
| State servant| 360000.0|
| Pensioner| 112500.0|
+----------------+----------------+



In [12]:
print("""
 SELECT NAME_INCOME_TYPE, AMT_INCOME_TOTAL
 FROM pdf_data
 WHERE SK_ID_CURR NOT IN (100002, 100010, 100011)
""")

(df_data.select("NAME_INCOME_TYPE", "AMT_INCOME_TOTAL")
 .where("SK_ID_CURR NOT IN {}".format((100002, 100010, 100011)))).show(5)


 SELECT NAME_INCOME_TYPE, AMT_INCOME_TOTAL
 FROM pdf_data
 WHERE SK_ID_CURR NOT IN (100002, 100010, 100011)

+----------------+----------------+
|NAME_INCOME_TYPE|AMT_INCOME_TOTAL|
+----------------+----------------+
| State servant| 270000.0|
| Working| 67500.0|
| Working| 135000.0|
| Working| 121500.0|
| State servant| 99000.0|
+----------------+----------------+
only showing top 5 rows



# GROUP BY, COUNT, ORDER BY

In [13]:
print("""
 SELECT CODE_GENDER, COUNT(TARGET)
 FROM pdf_data
 GROUP BY CODE_GENDER
 ORDER BY NUM_TARGET
""")

df_data.groupBy("CODE_GENDER").agg(sf.count(sf.col("TARGET")).alias("count")).orderBy("count").show()


 SELECT CODE_GENDER, COUNT(TARGET)
 FROM pdf_data
 GROUP BY CODE_GENDER
 ORDER BY NUM_TARGET

+-----------+------+
|CODE_GENDER| count|
+-----------+------+
| XNA| 4|
| M|105059|
| F|202448|
+-----------+------+



# Aggregate functions (MIN, MAX, MEAN)

In [14]:
print("""
 SELECT MAX(AMT_INCOME_TOTAL), MIN(AMT_INCOME_TOTAL), MEAN(AMT_INCOME_TOTAL)
 FROM pdf_data
""")

df_data.selectExpr(["MAX(AMT_INCOME_TOTAL)", 
 "MIN(AMT_INCOME_TOTAL)", 
 "MEAN(AMT_INCOME_TOTAL)"]).show()


 SELECT MAX(AMT_INCOME_TOTAL), MIN(AMT_INCOME_TOTAL), MEAN(AMT_INCOME_TOTAL)
 FROM pdf_data

+---------------------+---------------------+-------------------------------------+
|max(AMT_INCOME_TOTAL)|min(AMT_INCOME_TOTAL)|avg(CAST(AMT_INCOME_TOTAL AS DOUBLE))|
+---------------------+---------------------+-------------------------------------+
| 99900.0| 100071.0| 168797.91929698453|
+---------------------+---------------------+-------------------------------------+



# JOIN

- (INNER) JOIN: Returns records that have matching values in both tables
- LEFT (OUTER) JOIN: Return all records from the left table, and the matched records from the right table
- RIGHT (OUTER) JOIN: Return all records from the right table, and the matched records from the left table
- FULL (OUTER) JOIN: Return all records when there is a match in either left or right table

In [15]:
print("""
 SELECT *
 FROM df1
 INNER JOIN df2
 ON df1.SK_ID_CURR = df2.SK_ID_CURR
""")

df1 = df_data.select("SK_ID_CURR", "AMT_INCOME_TOTAL")
df2 = df_data.select("SK_ID_CURR", "CODE_GENDER", "FLAG_OWN_CAR")
df1.join(df2, on="SK_ID_CURR", how="inner").show()


 SELECT *
 FROM df1
 INNER JOIN df2
 ON df1.SK_ID_CURR = df2.SK_ID_CURR

+----------+----------------+-----------+------------+
|SK_ID_CURR|AMT_INCOME_TOTAL|CODE_GENDER|FLAG_OWN_CAR|
+----------+----------------+-----------+------------+
| 100002| 202500.0| M| N|
| 100003| 270000.0| F| N|
| 100004| 67500.0| M| Y|
| 100006| 135000.0| F| N|
| 100007| 121500.0| M| N|
| 100008| 99000.0| M| N|
| 100009| 171000.0| F| Y|
| 100010| 360000.0| M| Y|
| 100011| 112500.0| F| N|
| 100012| 135000.0| M| N|
| 100014| 112500.0| F| N|
| 100015| 38419.155| F| N|
| 100016| 67500.0| F| N|
| 100017| 225000.0| M| Y|
| 100018| 189000.0| F| N|
| 100019| 157500.0| M| Y|
| 100020| 108000.0| M| N|
| 100021| 81000.0| F| N|
| 100022| 112500.0| F| N|
| 100023| 90000.0| F| N|
+----------+----------------+-----------+------------+
only showing top 20 rows



# UNION ALL and UNION

In [16]:
print("""
 SELECT * FROM df1
 UNION ALL
 SELECT * FROM df2
""")

df1 = df_data.select("CODE_GENDER", "FLAG_OWN_CAR")
df2 = df_data.select("CODE_GENDER", "FLAG_OWN_CAR")

print("Union all:", df1.unionAll(df2).count())
print("Union:", df1.unionAll(df2).distinct().count())


 SELECT * FROM df1
 UNION ALL
 SELECT * FROM df2

Union all: 615022
Union: 6


# Insert, delete, update

Spark data frame is immutable