In [1]:
import findspark
findspark.init('/Users/ryanshin/Downloads/spark-2.3.1-bin-hadoop2.7')
import pyspark
sc = pyspark.SparkContext()

In [2]:
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

def age_group_func(age):
 return int(age / 10)
age_group = udf(age_group_func, returnType=IntegerType())

spark = SparkSession(sc)

titanic = spark.read.option("header", "true") \
 .csv("/Users/ryanshin/Downloads/train.csv") \
 .withColumn("Survived", col("Survived").cast("double")) \
 .withColumn("label", col("Survived")) \
 .withColumn("Pclass", col("Pclass").cast("double"))\
 .withColumn("SibSp", col("SibSp").cast("double"))\
 .withColumn("Parch", col("Parch").cast("double"))\
 .withColumn("Fare", col("Fare").cast("double"))\
 .withColumn("Age", col("Age").cast("int"))\
 .na.fill("S", "Embarked") \
 .na.fill(-1, "Age") \
 .withColumn("age_group", age_group(col("Age")))
titanic.printSchema()

root
 |-- PassengerId: string (nullable = true)
 |-- Survived: double (nullable = true)
 |-- Pclass: double (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- SibSp: double (nullable = true)
 |-- Parch: double (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = false)
 |-- label: double (nullable = true)
 |-- age_group: integer (nullable = true)



In [3]:
titanic.show()

+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-----+---------+
|PassengerId|Survived|Pclass| Name| Sex|Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|label|age_group|
+-----------+--------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+-----+---------+
| 1| 0.0| 3.0|Braund, Mr. Owen ...| male| 22| 1.0| 0.0| A/5 21171| 7.25| null| S| 0.0| 2|
| 2| 1.0| 1.0|Cumings, Mrs. Joh...|female| 38| 1.0| 0.0| PC 17599|71.2833| C85| C| 1.0| 3|
| 3| 1.0| 3.0|Heikkinen, Miss. ...|female| 26| 0.0| 0.0|STON/O2. 3101282| 7.925| null| S| 1.0| 2|
| 4| 1.0| 1.0|Futrelle, Mrs. Ja...|female| 35| 1.0| 0.0| 113803| 53.1| C123| S| 1.0| 3|
| 5| 0.0| 3.0|Allen, Mr. Willia...| male| 35| 0.0| 0.0| 373450| 8.05| null| S| 0.0| 3|
| 6| 0.0| 3.0| Moran, Mr. James| male| -1| 0.0| 0.0| 330877| 8.4583| null| Q| 0.0| 0|
| 7| 0.0| 1.0|McCarthy, Mr. Tim...| male| 54| 0.0| 0.0| 17463|51.8625| E46| S| 0.0| 5|
| 8| 0.0| 3

In [4]:
titanic.count()

891

In [5]:
# 다 죽었다고 예측
def predict1_func(dummy):
 return 0.0
predict1 = udf(predict1_func, returnType=DoubleType())
 
# 여자는 다 살았다고 남자는 다 죽었다고 예측
def predict2_func(gender):
 if gender == "female":
 return 1.0
 else:
 return 0.0 
predict2 = udf(predict2_func, returnType=DoubleType())
 
# UDF 생성
prediction1result = titanic.select(predict1("Sex").alias("prediction"), col("Survived").cast("double").alias("label"))
prediction2result = titanic.select(predict2("Sex").alias("prediction"), col("Survived").cast("double").alias("label"))

from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
evaluator.setRawPredictionCol("prediction").setLabelCol("label")

evaluator.setMetricName("areaUnderROC")
print("prediction1result areaUnderROC=%f" % evaluator.evaluate(prediction1result))
print("prediction2result areaUnderROC=%f" % evaluator.evaluate(prediction2result))

evaluator.setMetricName("areaUnderPR")
print("prediction1result areaUnderPR=%f" % evaluator.evaluate(prediction1result))
print("prediction2result areaUnderPR=%f" % evaluator.evaluate(prediction2result))

prediction1result areaUnderROC=0.500000
prediction2result areaUnderROC=0.766873
prediction1result areaUnderPR=0.383838
prediction2result areaUnderPR=0.684957


* 다 죽었다고 예측시 정확도

In [6]:
prediction1result = prediction1result.withColumn('accuracy',col('prediction')== col('label'))
prediction1result = prediction1result.where(prediction1result.prediction==prediction1result.label)
prediction1result.count() / float(titanic.count())

0.6161616161616161

* 여자는 다 살았다고 남자는 다 죽었다고 예측시 정확도

In [7]:
prediction2result = prediction2result.withColumn('accuracy',col('prediction')== col('label'))
prediction2result = prediction2result.where(prediction2result.prediction==prediction2result.label)
prediction2result.count() / float(titanic.count())

0.7867564534231201

In [8]:
titanic.withColumn("age_group", age_group(col("Age"))).groupBy("age_group", "Sex") \
 .agg(sum("Survived"), count("Survived"), sum("Survived")/count("Survived")) \
 .orderBy("Sex", "age_group").show()

+---------+------+-------------+---------------+---------------------------------+
|age_group| Sex|sum(Survived)|count(Survived)|(sum(Survived) / count(Survived))|
+---------+------+-------------+---------------+---------------------------------+
| 0|female| 55.0| 83| 0.6626506024096386|
| 1|female| 34.0| 45| 0.7555555555555555|
| 2|female| 52.0| 72| 0.7222222222222222|
| 3|female| 50.0| 60| 0.8333333333333334|
| 4|female| 22.0| 32| 0.6875|
| 5|female| 16.0| 18| 0.8888888888888888|
| 6|female| 4.0| 4| 1.0|
| 0| male| 35.0| 156| 0.22435897435897437|
| 1| male| 7.0| 57| 0.12280701754385964|
| 2| male| 25.0| 148| 0.16891891891891891|
| 3| male| 23.0| 107| 0.21495327102803738|
| 4| male| 12.0| 57| 0.21052631578947367|
| 5| male| 4.0| 30| 0.13333333333333333|
| 6| male| 2.0| 15| 0.13333333333333333|
| 7| male| 0.0| 6| 0.0|
| 8| male| 1.0| 1| 1.0|
+---------+------+-------------+---------------+---------------------------------+



In [9]:
training_data, test_data = titanic.randomSplit([0.7, 0.3])

In [10]:
from pyspark.ml.classification import *
from pyspark.ml.feature import *

# Train a RandomForest model.
rf = RandomForestClassifier() \
 .setNumTrees(20)

sex_indexer = StringIndexer().setInputCol("Sex").setOutputCol("sex_idx").fit(titanic)
embark_indexer = StringIndexer().setInputCol("Embarked").setOutputCol("embark_idx").fit(titanic)

assembler = VectorAssembler() \
 .setInputCols(["Pclass", "SibSp", "Parch", "age_group", "sex_idx"]) \
 .setOutputCol("features")


from pyspark.ml import Pipeline

# Chain indexers and forest in a Pipeline.
pipeline = Pipeline().setStages([sex_indexer, embark_indexer, assembler, rf])

pipeline_model = pipeline.fit(training_data)

prediction4result = pipeline_model.transform(test_data)

In [11]:
evaluator.setMetricName("areaUnderROC")
print(evaluator.evaluate(prediction4result))
print(" ")
evaluator.setMetricName("areaUnderPR")
print(evaluator.evaluate(prediction4result))

0.7497701149425287
 
0.7079536774334352


In [12]:
titanic_test = spark.read.option("header", "true") \
 .csv("/Users/ryanshin/Downloads/test.csv") \
 .withColumn("Pclass", col("Pclass").cast("double"))\
 .withColumn("SibSp", col("SibSp").cast("double"))\
 .withColumn("Parch", col("Parch").cast("double"))\
 .withColumn("Fare", col("Fare").cast("double"))\
 .withColumn("Age", col("Age").cast("int"))\
 .na.fill("S", "Embarked") \
 .na.fill(-1, "Age") \
 .withColumn("age_group", age_group(col("Age")))
titanic_test.show()

+-----------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+---------+
|PassengerId|Pclass| Name| Sex|Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|age_group|
+-----------+------+--------------------+------+---+-----+-----+----------------+-------+-----+--------+---------+
| 892| 3.0| Kelly, Mr. James| male| 34| 0.0| 0.0| 330911| 7.8292| null| Q| 3|
| 893| 3.0|Wilkes, Mrs. Jame...|female| 47| 1.0| 0.0| 363272| 7.0| null| S| 4|
| 894| 2.0|Myles, Mr. Thomas...| male| 62| 0.0| 0.0| 240276| 9.6875| null| Q| 6|
| 895| 3.0| Wirz, Mr. Albert| male| 27| 0.0| 0.0| 315154| 8.6625| null| S| 2|
| 896| 3.0|Hirvonen, Mrs. Al...|female| 22| 1.0| 1.0| 3101298|12.2875| null| S| 2|
| 897| 3.0|Svensson, Mr. Joh...| male| 14| 0.0| 0.0| 7538| 9.225| null| S| 1|
| 898| 3.0|Connolly, Miss. Kate|female| 30| 0.0| 0.0| 330972| 7.6292| null| Q| 3|
| 899| 2.0|Caldwell, Mr. Alb...| male| 26| 1.0| 1.0| 248738| 29.0| null| S| 2|
| 900| 3.0|Abrahim, Mrs. Jos...|female| 18| 0

In [13]:
result = pipeline_model.transform(titanic_test).select(col('PassengerId'), col('prediction').alias("Survived").cast('int'))
result.show()

+-----------+--------+
|PassengerId|Survived|
+-----------+--------+
| 892| 0|
| 893| 0|
| 894| 0|
| 895| 0|
| 896| 0|
| 897| 0|
| 898| 0|
| 899| 0|
| 900| 1|
| 901| 0|
| 902| 0|
| 903| 0|
| 904| 1|
| 905| 0|
| 906| 1|
| 907| 1|
| 908| 0|
| 909| 0|
| 910| 1|
| 911| 0|
+-----------+--------+
only showing top 20 rows



In [14]:
%%bash
rm -rf answer

In [15]:
result.repartition(1).write.csv('answer', header='true')

In [16]:
%%bash
cat answer/part*

PassengerId,Survived
892,0
893,0
894,0
895,0
896,0
897,0
898,0
899,0
900,1
901,0
902,0
903,0
904,1
905,0
906,1
907,1
908,0
909,0
910,1
911,0
912,0
913,1
914,1
915,0
916,1
917,0
918,1
919,0
920,0
921,0
922,0
923,0
924,0
925,0
926,1
927,0
928,1
929,0
930,0
931,0
932,0
933,0
934,0
935,1
936,1
937,0
938,0
939,0
940,1
941,0
942,1
943,0
944,1
945,1
946,0
947,0
948,0
949,0
950,0
951,1
952,0
953,0
954,0
955,0
956,0
957,1
958,1
959,0
960,0
961,1
962,0
963,0
964,0
965,0
966,1
967,0
968,0
969,1
970,0
971,0
972,1
973,0
974,0
975,0
976,0
977,0
978,0
979,1
980,1
981,1
982,1
983,0
984,1
985,0
986,0
987,0
988,1
989,0
990,0
991,0
992,1
993,0
994,0
995,0
996,0
997,0
998,0
999,0
1000,0
1001,0
1002,0
1003,1
1004,1
1005,1
1006,1
1007,0
1008,0
1009,1
1010,0
1011,1
1012,1
1013,0
1014,1
1015,0
1016,0
1017,1
1018,0
1019,1
1020,0
1021,0
1022,0
1023,0
1024,0
1025,0
1026,0
1027,0
1028,0
1029,0
1030,0
1031,0
1032,0
1033,1
1034,0
1035,0
1036,0
1037,0
1038,0
1039,0
1040,0
1041,0
1042,1
1043,0
1044,0
1045,0
1046,0
10