## Predicting diabetes readmission using *Spark.ml*, Spark 2.3.0 (Python API)

* Building a spark session which would give us access to all the things which Spark Core has to offer
* I was facing a bug on windows so I had to import findspark and initialize it before calling the spark session in a regular way

In [3]:
import findspark
findspark.init() # this allows you to import pyspark as a library
import pyspark
from pyspark.sql import SparkSession as SS # to get your spark session.

# Now instantiate your spark session using builder..
spark= SS.builder.master("local").appName("Predcting-Diabetes-Readmission").getOrCreate()

* If the data is clean then we can directly import the data using SparkSession.read() - This would read in the data as a Data Frame. We can do many different things with the data frame but for some complex cleaning, working with RDDs is preferred.


* In this dataset, there are missing values coded as "?" so I will first import the data as a DF but use its RDD form to replace "?" with Python None object and then create a Spark DataFrame from the resulting data so that the missing values are read in as missing.

In [4]:
rawData = spark.read\
 .format("csv")\
 .option("header", "true")\
 .load("diabetic_data.csv")

rawData.take(1)

[Row(encounter_id='2278392', patient_nbr='8222157', race='Caucasian', gender='Female', age='[0-10)', weight='?', admission_type_id='6', discharge_disposition_id='25', admission_source_id='1', time_in_hospital='1', payer_code='?', medical_specialty='Pediatrics-Endocrinology', num_lab_procedures='41', num_procedures='0', num_medications='1', number_outpatient='0', number_emergency='0', number_inpatient='0', diag_1='250.83', diag_2='?', diag_3='?', number_diagnoses='1', max_glu_serum='None', A1Cresult='None', metformin='No', repaglinide='No', nateglinide='No', chlorpropamide='No', glimepiride='No', acetohexamide='No', glipizide='No', glyburide='No', tolbutamide='No', pioglitazone='No', rosiglitazone='No', acarbose='No', miglitol='No', troglitazone='No', tolazamide='No', examide='No', citoglipton='No', insulin='No', glyburide-metformin='No', glipizide-metformin='No', glimepiride-pioglitazone='No', metformin-rosiglitazone='No', metformin-pioglitazone='No', change='No', diabetesMed='No', rea

* We can see above that variable weight, payer_code etc.. having "?" values which are missing values



* Converting DF to RDD, replacing "?" by Python None object using map and reconverting RDD to DF



* Also, dropping encounter_id and patient_nbr which are keys and won't be needed for prediction

In [5]:
rawData = rawData.drop('encounter_id', 'patient_nbr')

from pyspark.sql.types import *
field_names = rawData.columns
fields = [StructField(field_name, StringType(), True) for field_name in field_names]
rddWithoutQues = rawData.rdd.map(lambda x: [None if string == "?" else string for string in x])
schema = StructType(fields)
diabetes = spark.createDataFrame(rddWithoutQues, schema)
diabetes.take(1)

[Row(race='Caucasian', gender='Female', age='[0-10)', weight=None, admission_type_id='6', discharge_disposition_id='25', admission_source_id='1', time_in_hospital='1', payer_code=None, medical_specialty='Pediatrics-Endocrinology', num_lab_procedures='41', num_procedures='0', num_medications='1', number_outpatient='0', number_emergency='0', number_inpatient='0', diag_1='250.83', diag_2=None, diag_3=None, number_diagnoses='1', max_glu_serum='None', A1Cresult='None', metformin='No', repaglinide='No', nateglinide='No', chlorpropamide='No', glimepiride='No', acetohexamide='No', glipizide='No', glyburide='No', tolbutamide='No', pioglitazone='No', rosiglitazone='No', acarbose='No', miglitol='No', troglitazone='No', tolazamide='No', examide='No', citoglipton='No', insulin='No', glyburide-metformin='No', glipizide-metformin='No', glimepiride-pioglitazone='No', metformin-rosiglitazone='No', metformin-pioglitazone='No', change='No', diabetesMed='No', readmitted='NO')]

### Changing levels the response

* Our response variable had categories ">30" "<30" and "NO". We want ">30" and "<30" to be combined into 1 category "Yes".



* We do this by definin a User Defined Function and passing it into *.withColumn(col_name, function(col_name))* method of a Spark Data Frame

In [6]:
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def modify_values(r):
 if r == ">30" or r =="<30":
 return "Yes"
 else:
 return "No"
ol_val = udf(modify_values, StringType())
diabetes = diabetes.withColumn("readmitted",ol_val(diabetes.readmitted))
diabetes.take(1)

[Row(race='Caucasian', gender='Female', age='[0-10)', weight=None, admission_type_id='6', discharge_disposition_id='25', admission_source_id='1', time_in_hospital='1', payer_code=None, medical_specialty='Pediatrics-Endocrinology', num_lab_procedures='41', num_procedures='0', num_medications='1', number_outpatient='0', number_emergency='0', number_inpatient='0', diag_1='250.83', diag_2=None, diag_3=None, number_diagnoses='1', max_glu_serum='None', A1Cresult='None', metformin='No', repaglinide='No', nateglinide='No', chlorpropamide='No', glimepiride='No', acetohexamide='No', glipizide='No', glyburide='No', tolbutamide='No', pioglitazone='No', rosiglitazone='No', acarbose='No', miglitol='No', troglitazone='No', tolazamide='No', examide='No', citoglipton='No', insulin='No', glyburide-metformin='No', glipizide-metformin='No', glimepiride-pioglitazone='No', metformin-rosiglitazone='No', metformin-pioglitazone='No', change='No', diabetesMed='No', readmitted='No')]

### Calculate the number of missing values per column

* The line 2 in following code will give us the total number of missing values in each column. Previously our data had "?" as missing which wouldn't have been captured using this function. Now that we have converted then to Python None objects, they will be included here

In [7]:
from pyspark.sql.functions import isnan, when, count, col
diabetes.select([count(when(col(c).isNull(),c)).alias(c) for c in diabetes.columns]).show()

+----+------+---+------+-----------------+------------------------+-------------------+----------------+----------+-----------------+------------------+--------------+---------------+-----------------+----------------+----------------+------+------+------+----------------+-------------+---------+---------+-----------+-----------+--------------+-----------+-------------+---------+---------+-----------+------------+-------------+--------+--------+------------+----------+-------+-----------+-------+-------------------+-------------------+------------------------+-----------------------+----------------------+------+-----------+----------+
|race|gender|age|weight|admission_type_id|discharge_disposition_id|admission_source_id|time_in_hospital|payer_code|medical_specialty|num_lab_procedures|num_procedures|num_medications|number_outpatient|number_emergency|number_inpatient|diag_1|diag_2|diag_3|number_diagnoses|max_glu_serum|A1Cresult|metformin|repaglinide|nateglinide|chlorpropamide|glimepirid

* We can see columns **'weight', 'payer_code' and 'medical_speciality'** have many missing values



* Lets go ahead and delete these columns

In [8]:
diabetes = diabetes.drop('weight', 'payer_code', 'medical_specialty')
#diabetes.take(1)

* Converting a few features which are numeric but have been parsed as Strings due to "?"


* Dropping rows with NA values.


* I did not find an efficient way to do this so I had to hardcode.

In [9]:
diabetes = diabetes.withColumn("diag_2", diabetes["diag_2"].cast(DoubleType()))
diabetes = diabetes.withColumn("diag_3", diabetes["diag_3"].cast(DoubleType()))
diabetes = diabetes.withColumn("diag_1", diabetes["diag_1"].cast(DoubleType()))
diabetes = diabetes.withColumn("time_in_hospital", diabetes["time_in_hospital"].cast(DoubleType()))
diabetes = diabetes.withColumn("num_lab_procedures", diabetes["num_lab_procedures"].cast(DoubleType()))
diabetes = diabetes.withColumn("num_medications", diabetes["num_medications"].cast(DoubleType()))
diabetes = diabetes.withColumn("number_emergency", diabetes["number_emergency"].cast(DoubleType()))
diabetes = diabetes.withColumn("number_inpatient", diabetes["number_inpatient"].cast(DoubleType()))
diabetes = diabetes.withColumn("number_diagnoses", diabetes["number_diagnoses"].cast(DoubleType()))

# Dropping the remaining few NA rows
diabetes = diabetes.dropna()

#diabetes.take(1)

* I had to import a list of Redundant and unbalanced features from R


* Deleting these features

In [10]:
# Redundant and unbalacned feature list imported from R
diabetes = diabetes.drop('examide', 'citoglipton', 'metformin-rosiglitazone', 'metformin-pioglitazone', 
 'glimepiride-pioglitazone', 'citoglipton, examide', 'acetohexamide',
 'repaglinide', 'nateglinide', 'chlorpropamide', 'tolbutamide', 'acarbose', 'miglitol',
 'troglitazone', 'tolazamide', 'glyburide-metformin', 'glipizide-metformin')
diabetes

DataFrame[race: string, gender: string, age: string, admission_type_id: string, discharge_disposition_id: string, admission_source_id: string, time_in_hospital: double, num_lab_procedures: double, num_procedures: string, num_medications: double, number_outpatient: string, number_emergency: double, number_inpatient: double, diag_1: double, diag_2: double, diag_3: double, number_diagnoses: double, max_glu_serum: string, A1Cresult: string, metformin: string, glimepiride: string, glipizide: string, glyburide: string, pioglitazone: string, rosiglitazone: string, insulin: string, change: string, diabetesMed: string, readmitted: string]

### Prepping for modelling stage

* Spark.ML's current functionality deals only with numeric columns


* We will use the StringIndexer class to index each categorical column


* We will leave the already double columns as is

In [11]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

# Creating a list of STRING dtype columns
cols_to_index_1 = [x[0] if x[1] == "string" else None for x in diabetes.dtypes]
cols_to_index = [x for x in cols_to_index_1 if x != None]

# Creating the indexers for each column
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in cols_to_index]
indexers_fitted = [ind.fit(diabetes) for ind in indexers]

# Passing it on to the Pipeline function
# Pipeline transforms our DF using "stages" 
pipeline = Pipeline(stages=indexers_fitted)
diabetes_indexed = pipeline.fit(diabetes).transform(diabetes)
diabetes_indexed

DataFrame[race: string, gender: string, age: string, admission_type_id: string, discharge_disposition_id: string, admission_source_id: string, time_in_hospital: double, num_lab_procedures: double, num_procedures: string, num_medications: double, number_outpatient: string, number_emergency: double, number_inpatient: double, diag_1: double, diag_2: double, diag_3: double, number_diagnoses: double, max_glu_serum: string, A1Cresult: string, metformin: string, glimepiride: string, glipizide: string, glyburide: string, pioglitazone: string, rosiglitazone: string, insulin: string, change: string, diabetesMed: string, readmitted: string, race_index: double, gender_index: double, age_index: double, admission_type_id_index: double, discharge_disposition_id_index: double, admission_source_id_index: double, num_procedures_index: double, number_outpatient_index: double, max_glu_serum_index: double, A1Cresult_index: double, metformin_index: double, glimepiride_index: double, glipizide_index: double,

* Since our indexed columns have a suffix "_index_" now, we delete the original columns


* Also, renaming "response_indexed" to "response". Response is "readmitted"

In [12]:
diabetes_indexed = diabetes_indexed.drop('race',
 'gender',
 'age',
 'admission_type_id',
 'discharge_disposition_id',
 'admission_source_id',
 'num_procedures',
 'number_outpatient',
 'max_glu_serum',
 'A1Cresult',
 'metformin',
 'glimepiride',
 'glipizide',
 'glyburide',
 'pioglitazone',
 'rosiglitazone',
 'insulin',
 'change',
 'diabetesMed',
 'readmitted')

diabetes_indexed = diabetes_indexed.withColumnRenamed("readmitted_index", "readmitted")

## ChiSq feature selection.

* Spark has several different modules for feature selection


* I tried to use ChiSq feature selection and was successful


* I decided to not include it in my analysis as it needs more work

In [20]:
#from pyspark.ml.linalg import Vectors
#from pyspark.ml.feature import ChiSqSelector

#def Vectorize(data):
# return data.rdd.map(lambda r: [Vectors.dense(r[0:19]), r[19]]).toDF(["features","readmitted"])

#diabetes_indexed_vectorized = Vectorize(diabetes_indexed)

#selector = ChiSqSelector(numTopFeatures=7, featuresCol="features",
# outputCol="selectedFeatures", labelCol="readmitted")


#diabetes_chsq = selector.fit(diabetes_indexed_vectorized).transform(diabetes_indexed_vectorized)
#diabetes_chsq = diabetes_chsq.drop('features')

### One Hot Encoding

* Creating a list of features to be OHE

In [13]:
cols_to_ohe = cols_to_index
cols_to_ohe.remove("readmitted")
cols_to_ohe_ = [col+"_index" for col in cols_to_ohe]
cols_to_ohe_

['race_index',
 'gender_index',
 'age_index',
 'admission_type_id_index',
 'discharge_disposition_id_index',
 'admission_source_id_index',
 'num_procedures_index',
 'number_outpatient_index',
 'max_glu_serum_index',
 'A1Cresult_index',
 'metformin_index',
 'glimepiride_index',
 'glipizide_index',
 'glyburide_index',
 'pioglitazone_index',
 'rosiglitazone_index',
 'insulin_index',
 'change_index',
 'diabetesMed_index']

* OHE Code

* Output of OHE is in the form of sparse vectors. One each for one feature

In [14]:
output_ohe_cols = [x+"_vector" for x in cols_to_ohe_]

from pyspark.ml.feature import OneHotEncoderEstimator
encoder = OneHotEncoderEstimator(inputCols=cols_to_ohe_,
 outputCols=output_ohe_cols)

model_ohe = encoder.fit(diabetes_indexed)
diabetes_ohe = model_ohe.transform(diabetes_indexed)
diabetes_ohe.take(1)

[Row(time_in_hospital=3.0, num_lab_procedures=59.0, num_medications=18.0, number_emergency=0.0, number_inpatient=0.0, diag_1=276.0, diag_2=250.01, diag_3=255.0, number_diagnoses=9.0, race_index=0.0, gender_index=0.0, age_index=8.0, admission_type_id_index=0.0, discharge_disposition_id_index=0.0, admission_source_id_index=0.0, num_procedures_index=0.0, number_outpatient_index=0.0, max_glu_serum_index=0.0, A1Cresult_index=0.0, metformin_index=0.0, glimepiride_index=0.0, glipizide_index=0.0, glyburide_index=0.0, pioglitazone_index=0.0, rosiglitazone_index=0.0, insulin_index=3.0, change_index=1.0, diabetesMed_index=0.0, readmitted=1.0, metformin_index_vector=SparseVector(3, {0: 1.0}), admission_source_id_index_vector=SparseVector(15, {0: 1.0}), rosiglitazone_index_vector=SparseVector(3, {0: 1.0}), glimepiride_index_vector=SparseVector(3, {0: 1.0}), discharge_disposition_id_index_vector=SparseVector(25, {0: 1.0}), glipizide_index_vector=SparseVector(3, {0: 1.0}), max_glu_serum_index_vector=

* Dropping original categorical columns. Spark doesn't do that for us

In [15]:
diabetes_ohe = diabetes_ohe.drop('race_index',
 'gender_index',
 'age_index',
 'admission_type_id_index',
 'discharge_disposition_id_index',
 'admission_source_id_index',
 'num_procedures_index',
 'number_outpatient_index',
 'max_glu_serum_index',
 'A1Cresult_index',
 'metformin_index',
 'glimepiride_index',
 'glipizide_index',
 'glyburide_index',
 'pioglitazone_index',
 'rosiglitazone_index',
 'insulin_index',
 'change_index',
 'diabetesMed_index')
diabetes_ohe

DataFrame[time_in_hospital: double, num_lab_procedures: double, num_medications: double, number_emergency: double, number_inpatient: double, diag_1: double, diag_2: double, diag_3: double, number_diagnoses: double, readmitted: double, metformin_index_vector: vector, admission_source_id_index_vector: vector, rosiglitazone_index_vector: vector, glimepiride_index_vector: vector, discharge_disposition_id_index_vector: vector, glipizide_index_vector: vector, max_glu_serum_index_vector: vector, gender_index_vector: vector, number_outpatient_index_vector: vector, race_index_vector: vector, diabetesMed_index_vector: vector, admission_type_id_index_vector: vector, A1Cresult_index_vector: vector, change_index_vector: vector, glyburide_index_vector: vector, age_index_vector: vector, insulin_index_vector: vector, pioglitazone_index_vector: vector, num_procedures_index_vector: vector]

In [16]:
diabetes_ohe.show(1)

+----------------+------------------+---------------+----------------+----------------+------+------+------+----------------+----------+----------------------+--------------------------------+--------------------------+------------------------+-------------------------------------+----------------------+--------------------------+-------------------+------------------------------+-----------------+------------------------+------------------------------+----------------------+-------------------+----------------------+----------------+--------------------+-------------------------+---------------------------+
|time_in_hospital|num_lab_procedures|num_medications|number_emergency|number_inpatient|diag_1|diag_2|diag_3|number_diagnoses|readmitted|metformin_index_vector|admission_source_id_index_vector|rosiglitazone_index_vector|glimepiride_index_vector|discharge_disposition_id_index_vector|glipizide_index_vector|max_glu_serum_index_vector|gender_index_vector|number_outpatient_index_vector|r

* Its a bit difficult to see but each column has either a double value or a sparkse vector value.


* We need to assemble these features together in a way which Spark.ml algorithms require the features to be.


* We will use VectorAssembler to do that

In [18]:
# The following code will take in all of these columns and convert it to 1 column named "features" which will store data of
# ALL features for 1 record (row)

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
 inputCols=['time_in_hospital',
 'num_lab_procedures',
 'num_medications',
 'number_emergency',
 'number_inpatient',
 'diag_1',
 'diag_2',
 'diag_3',
 'number_diagnoses',
 'metformin_index_vector',
 'admission_source_id_index_vector',
 'rosiglitazone_index_vector',
 'glimepiride_index_vector',
 'discharge_disposition_id_index_vector',
 'glipizide_index_vector',
 'max_glu_serum_index_vector',
 'gender_index_vector',
 'number_outpatient_index_vector',
 'race_index_vector',
 'diabetesMed_index_vector',
 'admission_type_id_index_vector',
 'A1Cresult_index_vector',
 'change_index_vector',
 'glyburide_index_vector',
 'age_index_vector',
 'insulin_index_vector',
 'pioglitazone_index_vector',
 'num_procedures_index_vector'],
 outputCol="features")

output = assembler.transform(diabetes_ohe)

output = output.drop('time_in_hospital',
 'num_lab_procedures',
 'num_medications',
 'number_emergency',
 'number_inpatient',
 'diag_1',
 'diag_2',
 'diag_3',
 'number_diagnoses',
 'metformin_index_vector',
 'admission_source_id_index_vector',
 'rosiglitazone_index_vector',
 'glimepiride_index_vector',
 'discharge_disposition_id_index_vector',
 'glipizide_index_vector',
 'max_glu_serum_index_vector',
 'gender_index_vector',
 'number_outpatient_index_vector',
 'race_index_vector',
 'diabetesMed_index_vector',
 'admission_type_id_index_vector',
 'A1Cresult_index_vector',
 'change_index_vector',
 'glyburide_index_vector',
 'age_index_vector',
 'insulin_index_vector',
 'pioglitazone_index_vector',
 'num_procedures_index_vector')

output.show(5)

+----------+--------------------+
|readmitted| features|
+----------+--------------------+
| 1.0|(144,[0,1,2,5,6,7...|
| 0.0|(144,[0,1,2,5,6,7...|
| 0.0|(144,[0,1,2,5,6,7...|
| 1.0|(144,[0,1,2,5,6,7...|
| 1.0|(144,[0,1,2,5,6,7...|
+----------+--------------------+
only showing top 5 rows



### RANDOM FOREST

* Training RF with a training, testing split of 80-20%

* Testing the algorithm on test set and print the Accuracy

* **Accuracy = 63.42%**

In [19]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

training, testing = output.randomSplit([0.8,0.2])
rf = RandomForestClassifier(numTrees=100, maxDepth=6, labelCol="readmitted", seed=42,
 featureSubsetStrategy='onethird')
model = rf.fit(training)

predictions = model.transform(testing)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
 labelCol="readmitted", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)

Accuracy = 0.634299


### Logistic Regression

* Training Logistic regression with Elastic Net Regularization

* **Accuracy = 53.9%**

In [21]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=100, regParam=0.3, elasticNetParam=0.8, labelCol="readmitted", 
 featuresCol="features")
lrModel = lr.fit(training)

preds_lr = lrModel.transform(testing)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
 labelCol="readmitted", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(preds_lr)
print("Accuracy = %g" % accuracy)

Accuracy = 0.539396
