In [1]:
%matplotlib inline
from pyspark.sql import SparkSession
import pandas as pd

import sys
sys.path.append('..')
from utils.pysparkutils import *

spark = SparkSession.builder.appName("titanic").getOrCreate()

In [2]:
train = spark.read.csv('./train.csv', header="true", inferSchema="true")
test = spark.read.csv('./test.csv', header="true", inferSchema="true")

train.printSchema()
test.printSchema()


root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)

root
 |-- PassengerId: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [3]:
train.limit(20).toPandas()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S
5,6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
6,7,0,1,"McCarthy, Mr. Timothy J",male,54.0,0,0,17463,51.8625,E46,S
7,8,0,3,"Palsson, Master. Gosta Leonard",male,2.0,3,1,349909,21.075,,S
8,9,1,3,"Johnson, Mrs. Oscar W (Elisabeth Vilhelmina Berg)",female,27.0,0,2,347742,11.1333,,S
9,10,1,2,"Nasser, Mrs. Nicholas (Adele Achem)",female,14.0,1,0,237736,30.0708,,C


In this section we will explore missing data.

In [4]:
findMissingValuesCols(train)

[('Age', 0.19865319865319866),
 ('Cabin', 0.7710437710437711),
 ('Embarked', 0.002244668911335578)]

We can see almost 80% of Cabin column is missing data. So we will drop the Cabin column.
Very few data is missing in Embarked column. We will just drop those rows.

In [5]:
from pyspark.ml.feature import Imputer
ageImputer = Imputer(inputCols=['Age'], outputCols=['imputedAge'], strategy='median')

In [6]:
train = train.filter(train.Embarked.isNotNull())
train = train.drop('Cabin')
train.printSchema()
train.count()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Embarked: string (nullable = true)



889

# Exploratory Data Analysis
In next few sections, we will explore training data and the relationship between different features and labels.
As we already know, most of passengers in Titanic didn't survive. Our training data suggests the same, around one-third of the passengers survived. Same goes for passenger class and sex.

In [7]:
labelCol = 'Survived'
train.groupby(labelCol).count().toPandas()

Unnamed: 0,Survived,count
0,1,340
1,0,549


In [8]:
train.crosstab(labelCol, 'Sex').toPandas()

Unnamed: 0,Survived_Sex,female,male
0,1,231,109
1,0,81,468


Pointwise Mutual Information (PMI) is useful metric for exploring the relationship between two categorical features. PMI gives a scalar value for a pair of values in features. The value denotes the amount of information can be derived about other categori

PMI can be normalized between [-1,+1], is called Normalized PMI, resulting in: 
* -1 (in the limit) for never occurring together,
* 0 for independence,
* +1 for complete co-occurrence


`calcNormalizedPointwiseMutualInformation` function is implemented in `pysparkutils.py` file in `utils` directory.

In [9]:
pmis = calcNormalizedPointwiseMutualInformation(train, 'Sex', labelCol)
toPandasDF(pmis, 'Normalized PMI', 'Sex', labelCol)

Unnamed: 0,Sex,Survived,Normalized PMI
2,female,0,-0.361721
1,female,1,0.490151
0,male,0,0.424895
3,male,1,-0.336078


In [10]:
train.crosstab(labelCol, 'Pclass').toPandas()

Unnamed: 0,Survived_Pclass,1,2,3
0,1,134,87,119
1,0,80,97,372


In [11]:
pmis = calcNormalizedPointwiseMutualInformation(train, 'Pclass', labelCol)
toPandasDF(pmis, 'Normalized PMI', 'Pclass', labelCol)

Unnamed: 0,Pclass,Survived,Normalized PMI
0,1,0,-0.208445
2,1,1,0.260544
4,2,0,-0.071421
3,2,1,0.091268
5,3,0,0.234674
1,3,1,-0.22684


In [12]:
train.crosstab(labelCol, 'Embarked').toPandas()

Unnamed: 0,Survived_Embarked,C,Q,S
0,1,93,30,217
1,0,75,47,427


In [13]:
pmis = calcNormalizedPointwiseMutualInformation(train, 'Embarked', labelCol)
toPandasDF(pmis, 'Normalized PMI', 'Embarked', labelCol)

Unnamed: 0,Embarked,Survived,Normalized PMI
5,C,0,-0.131229
3,C,1,0.163804
4,Q,0,-0.003966
0,Q,1,0.005472
1,S,0,0.096935
2,S,1,-0.08981


In [14]:
train.crosstab(labelCol, 'SibSp').toPandas()

Unnamed: 0,Survived_SibSp,0,1,2,3,4,5,8
0,1,208,112,13,4,3,0,0
1,0,398,97,15,12,15,5,7


In [15]:
pmis = calcNormalizedPointwiseMutualInformation(train, 'SibSp', labelCol)
toPandasDF(pmis, 'Normalized PMI', 'SibSp', labelCol)

Unnamed: 0,SibSp,Survived,Normalized PMI
4,0,0,0.076614
6,0,1,-0.074483
0,1,0,-0.128928
3,1,1,0.162829
7,2,0,-0.034825
5,2,1,0.045891
10,3,0,0.045135
1,3,1,-0.078675
2,4,0,0.073413
11,4,1,-0.145939


In [16]:
train.crosstab(labelCol, 'Parch').toPandas()

Unnamed: 0,Survived_Parch,0,1,2,3,4,5,6
0,1,231,65,40,3,0,1,0
1,0,445,53,40,2,4,4,1


In [17]:
pmis = calcNormalizedPointwiseMutualInformation(train, 'Parch', labelCol)
toPandasDF(pmis, 'Normalized PMI', 'Parch', labelCol)

Unnamed: 0,Parch,Survived,Normalized PMI
5,0,0,0.092309
7,0,1,-0.083569
0,1,0,-0.112913
4,1,1,0.139486
8,2,0,-0.068086
6,2,1,0.086419
11,3,0,-0.071231
1,3,1,0.079123
3,4,0,0.089196
9,5,0,0.047902


Now we will calculate the entropy of categorical features, which will give us the variance for categorical features.

In [18]:
columns = ['Sex', 'Pclass', 'Embarked', 'SibSp', 'Parch']
entropies = calcNormalizedEntropy(train, *columns)
dictToPandasDF(entropies, 'Feature', 'Entropy')

Unnamed: 0,Feature,Entropy
0,Sex,0.934919
1,Pclass,0.907245
2,Embarked,0.692048
3,SibSp,0.477435
4,Parch,0.40251


Categorical feature independence test via chi square test.

In [19]:
from pyspark.ml import Pipeline
from pyspark.ml.stat import ChiSquareTest
from pyspark.ml.feature import Bucketizer, OneHotEncoderEstimator, StringIndexer, VectorAssembler, VectorIndexer

edaEmbarkedIndexer = StringIndexer(inputCol='Embarked', outputCol='indexedEmbarked')
edaSexIndexer = StringIndexer(inputCol='Sex', outputCol='indexedSex')

edaAgeImputer = Imputer(inputCols=['Age'], outputCols=['imputedAge'], strategy='median')

ageSplits = [0, 16, 32, 48, 64, 200]
edaAgeBucketizer = Bucketizer(splits=ageSplits, inputCol='imputedAge', outputCol='bucketedAge')

fareSplits = [-float('inf'), 7.91, 14.454, 31, float('inf')]
edaFareBucketizer = Bucketizer(splits=fareSplits, inputCol='Fare', outputCol='bucketedFare')

oneHotEncoderEstimator = OneHotEncoderEstimator(inputCols=['indexedSex', 'indexedEmbarked', 'bucketedFare', 'bucketedAge'], 
                                                outputCols=['oneHotSex', 'oneHotEmbarked','oneHotFare', 'oneHotAge'])
inputCols=['Pclass', 'oneHotSex', 'oneHotEmbarked','oneHotFare', 'oneHotAge']
edaAssembler = VectorAssembler(inputCols=inputCols, outputCol='features')

pipeline = Pipeline(stages=[edaEmbarkedIndexer, edaSexIndexer, edaAgeImputer, edaAgeBucketizer, 
                            edaFareBucketizer, oneHotEncoderEstimator, edaAssembler])
chiSqTrain = pipeline.fit(train).transform(train)

r = ChiSquareTest.test(chiSqTrain, 'features', 'Survived').head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

pValues: [0.0,0.0,6.02813466444e-06,4.02603175464e-07,4.93843854699e-11,0.0101897422598,0.0315461645121,4.25298058386e-05,0.00150622342036,0.612884928604,0.116808580457]
degreesOfFreedom: [2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
statistics: [100.980407261,260.756342249,20.4792462347,25.6818141585,43.2014376768,6.60142083307,4.62299205997,16.755010532,10.0709867693,0.255995235761,2.45959925254]


# Classification

In [20]:
from pyspark.ml.feature import StringIndexer

embarkedIndexer = StringIndexer(inputCol='Embarked', outputCol='indexedEmbarked', handleInvalid='skip')
sexFeatureIndexer = StringIndexer(inputCol='Sex', outputCol='indexedSex', handleInvalid='skip')

In [21]:
from pyspark.ml.feature import Bucketizer

ageSplits = [0, 16, 32, 48, 64, 200]
ageBucketizer = Bucketizer(splits=ageSplits, inputCol='imputedAge', outputCol='bucketedAge', handleInvalid='skip')
fareSplits = [-float('inf'), 7.91, 14.454, 31, float('inf')]
fareBucketizer = Bucketizer(splits=fareSplits, inputCol='Fare', outputCol='bucketedFare', handleInvalid='skip')

In [22]:
from pyspark.ml.feature import OneHotEncoderEstimator, VectorIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

oneHotEncoderEstimator = OneHotEncoderEstimator(inputCols=['indexedSex', 'indexedEmbarked', 'bucketedFare', 'bucketedAge'], 
                                                outputCols=['oneHotSex', 'oneHotEmbarked','oneHotFare', 'oneHotAge'])
assembler = VectorAssembler(inputCols=['Pclass', 'SibSp', 'Parch', 'bucketedAge', 
                                       'bucketedFare', 'indexedEmbarked', 'indexedSex'], outputCol='features')
rf = RandomForestClassifier(labelCol=labelCol, featuresCol='features')

In [23]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[ageImputer, embarkedIndexer, sexFeatureIndexer, ageBucketizer, 
                            fareBucketizer, oneHotEncoderEstimator, assembler, rf])

grid = ParamGridBuilder().addGrid(rf.numTrees, [15, 20, 25, 30])\
                         .addGrid(rf.maxDepth, [5, 8])\
                         .build()

cv = CrossValidator(estimator=pipeline, 
                    estimatorParamMaps=grid, 
                    evaluator=BinaryClassificationEvaluator(labelCol=labelCol, metricName='areaUnderROC'), 
                    numFolds=10)

model = cv.fit(train)
train = model.transform(train)

In [24]:
evaluator = model.getEvaluator()
evaluator.evaluate(train)

0.9265509482481523

In [25]:
test = model.transform(test)

In [26]:
test.limit(20).toPandas()

Unnamed: 0,PassengerId,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,...,bucketedAge,bucketedFare,oneHotSex,oneHotEmbarked,oneHotFare,oneHotAge,features,rawPrediction,probability,prediction
0,892,3,"Kelly, Mr. James",male,34.5,0,0,330911,7.8292,,...,2.0,0.0,(1.0),"(0.0, 0.0)","(1.0, 0.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","(3.0, 0.0, 0.0, 2.0, 0.0, 2.0, 0.0)","[29.2960574888, 0.703942511236]","[0.976535249625, 0.0234647503745]",0.0
1,893,3,"Wilkes, Mrs. James (Ellen Needs)",female,47.0,1,0,363272,7.0,,...,2.0,0.0,(0.0),"(1.0, 0.0)","(1.0, 0.0, 0.0)","(0.0, 0.0, 1.0, 0.0)","[3.0, 1.0, 0.0, 2.0, 0.0, 0.0, 1.0]","[17.9363522365, 12.0636477635]","[0.597878407882, 0.402121592118]",0.0
2,894,2,"Myles, Mr. Thomas Francis",male,62.0,0,0,240276,9.6875,,...,3.0,1.0,(1.0),"(0.0, 0.0)","(0.0, 1.0, 0.0)","(0.0, 0.0, 0.0, 1.0)","[2.0, 0.0, 0.0, 3.0, 1.0, 2.0, 0.0]","[26.2911066277, 3.70889337228]","[0.876370220924, 0.123629779076]",0.0
3,895,3,"Wirz, Mr. Albert",male,27.0,0,0,315154,8.6625,,...,1.0,1.0,(1.0),"(1.0, 0.0)","(0.0, 1.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(3.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0)","[25.9086506078, 4.09134939223]","[0.863621686926, 0.136378313074]",0.0
4,896,3,"Hirvonen, Mrs. Alexander (Helga E Lindqvist)",female,22.0,1,1,3101298,12.2875,,...,1.0,1.0,(0.0),"(1.0, 0.0)","(0.0, 1.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","[3.0, 1.0, 1.0, 1.0, 1.0, 0.0, 1.0]","[21.2731321988, 8.72686780122]","[0.709104406626, 0.290895593374]",0.0
5,897,3,"Svensson, Mr. Johan Cervin",male,14.0,0,0,7538,9.225,,...,0.0,1.0,(1.0),"(1.0, 0.0)","(0.0, 1.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(3.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","[21.79357844, 8.20642155996]","[0.726452614668, 0.273547385332]",0.0
6,898,3,"Connolly, Miss. Kate",female,30.0,0,0,330972,7.6292,,...,1.0,0.0,(0.0),"(0.0, 0.0)","(1.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","[3.0, 0.0, 0.0, 1.0, 0.0, 2.0, 1.0]","[6.58546403436, 23.4145359656]","[0.219515467812, 0.780484532188]",1.0
7,899,2,"Caldwell, Mr. Albert Francis",male,26.0,1,1,248738,29.0,,...,1.0,2.0,(1.0),"(1.0, 0.0)","(0.0, 0.0, 1.0)","(0.0, 1.0, 0.0, 0.0)","[2.0, 1.0, 1.0, 1.0, 2.0, 0.0, 0.0]","[26.9219114219, 3.07808857809]","[0.897397047397, 0.102602952603]",0.0
8,900,3,"Abrahim, Mrs. Joseph (Sophie Halaut Easu)",female,18.0,0,0,2657,7.2292,,...,1.0,0.0,(0.0),"(0.0, 1.0)","(1.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","[3.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0]","[5.95376028838, 24.0462397116]","[0.198458676279, 0.801541323721]",1.0
9,901,3,"Davies, Mr. John Samuel",male,21.0,2,0,A/4 48871,24.15,,...,1.0,2.0,(1.0),"(1.0, 0.0)","(0.0, 0.0, 1.0)","(0.0, 1.0, 0.0, 0.0)","[3.0, 2.0, 0.0, 1.0, 2.0, 0.0, 0.0]","[27.683257006, 2.31674299405]","[0.922775233532, 0.0772247664683]",0.0


Write the predictions to CSV file in Kaggle specified format.

In [27]:
from pyspark.sql.types import IntegerType

csvPath = 'prediction.csv'
test.select('PassengerId', 'prediction')\
    .coalesce(1)\
    .withColumn('Survived', test['prediction'].cast(IntegerType()))\
    .drop('prediction')\
    .write.csv(csvPath, header='true', mode='ignore')