{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Predicting diabetes readmission using *Spark.ml*, Spark 2.3.0 (Python API)\n", "\n", "* Building a spark session which would give us access to all the things which Spark Core has to offer\n", "* I was facing a bug on windows so I had to import findspark and initialize it before calling the spark session in a regular way" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import findspark\n", "findspark.init() # this allows you to import pyspark as a library\n", "import pyspark\n", "from pyspark.sql import SparkSession as SS # to get your spark session.\n", "\n", "# Now instantiate your spark session using builder..\n", "spark= SS.builder.master(\"local\").appName(\"Predcting-Diabetes-Readmission\").getOrCreate()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* If the data is clean then we can directly import the data using SparkSession.read() - This would read in the data as a Data Frame. We can do many different things with the data frame but for some complex cleaning, working with RDDs is preferred.\n", "\n", "\n", "* In this dataset, there are missing values coded as \"?\" so I will first import the data as a DF but use its RDD form to replace \"?\" with Python None object and then create a Spark DataFrame from the resulting data so that the missing values are read in as missing." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(encounter_id='2278392', patient_nbr='8222157', race='Caucasian', gender='Female', age='[0-10)', weight='?', admission_type_id='6', discharge_disposition_id='25', admission_source_id='1', time_in_hospital='1', payer_code='?', medical_specialty='Pediatrics-Endocrinology', num_lab_procedures='41', num_procedures='0', num_medications='1', number_outpatient='0', number_emergency='0', number_inpatient='0', diag_1='250.83', diag_2='?', diag_3='?', number_diagnoses='1', max_glu_serum='None', A1Cresult='None', metformin='No', repaglinide='No', nateglinide='No', chlorpropamide='No', glimepiride='No', acetohexamide='No', glipizide='No', glyburide='No', tolbutamide='No', pioglitazone='No', rosiglitazone='No', acarbose='No', miglitol='No', troglitazone='No', tolazamide='No', examide='No', citoglipton='No', insulin='No', glyburide-metformin='No', glipizide-metformin='No', glimepiride-pioglitazone='No', metformin-rosiglitazone='No', metformin-pioglitazone='No', change='No', diabetesMed='No', readmitted='NO')]" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rawData = spark.read\\\n", " .format(\"csv\")\\\n", " .option(\"header\", \"true\")\\\n", " .load(\"diabetic_data.csv\")\n", "\n", "rawData.take(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* We can see above that variable weight, payer_code etc.. having \"?\" values which are missing values\n", "\n", "\n", "\n", "* Converting DF to RDD, replacing \"?\" by Python None object using map and reconverting RDD to DF\n", "\n", "\n", "\n", "* Also, dropping encounter_id and patient_nbr which are keys and won't be needed for prediction" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(race='Caucasian', gender='Female', age='[0-10)', weight=None, admission_type_id='6', discharge_disposition_id='25', admission_source_id='1', time_in_hospital='1', payer_code=None, medical_specialty='Pediatrics-Endocrinology', num_lab_procedures='41', num_procedures='0', num_medications='1', number_outpatient='0', number_emergency='0', number_inpatient='0', diag_1='250.83', diag_2=None, diag_3=None, number_diagnoses='1', max_glu_serum='None', A1Cresult='None', metformin='No', repaglinide='No', nateglinide='No', chlorpropamide='No', glimepiride='No', acetohexamide='No', glipizide='No', glyburide='No', tolbutamide='No', pioglitazone='No', rosiglitazone='No', acarbose='No', miglitol='No', troglitazone='No', tolazamide='No', examide='No', citoglipton='No', insulin='No', glyburide-metformin='No', glipizide-metformin='No', glimepiride-pioglitazone='No', metformin-rosiglitazone='No', metformin-pioglitazone='No', change='No', diabetesMed='No', readmitted='NO')]" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rawData = rawData.drop('encounter_id', 'patient_nbr')\n", "\n", "from pyspark.sql.types import *\n", "field_names = rawData.columns\n", "fields = [StructField(field_name, StringType(), True) for field_name in field_names]\n", "rddWithoutQues = rawData.rdd.map(lambda x: [None if string == \"?\" else string for string in x])\n", "schema = StructType(fields)\n", "diabetes = spark.createDataFrame(rddWithoutQues, schema)\n", "diabetes.take(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Changing levels the response\n", "\n", "* Our response variable had categories \">30\" \"<30\" and \"NO\". We want \">30\" and \"<30\" to be combined into 1 category \"Yes\".\n", "\n", "\n", "\n", "* We do this by definin a User Defined Function and passing it into *.withColumn(col_name, function(col_name))* method of a Spark Data Frame" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(race='Caucasian', gender='Female', age='[0-10)', weight=None, admission_type_id='6', discharge_disposition_id='25', admission_source_id='1', time_in_hospital='1', payer_code=None, medical_specialty='Pediatrics-Endocrinology', num_lab_procedures='41', num_procedures='0', num_medications='1', number_outpatient='0', number_emergency='0', number_inpatient='0', diag_1='250.83', diag_2=None, diag_3=None, number_diagnoses='1', max_glu_serum='None', A1Cresult='None', metformin='No', repaglinide='No', nateglinide='No', chlorpropamide='No', glimepiride='No', acetohexamide='No', glipizide='No', glyburide='No', tolbutamide='No', pioglitazone='No', rosiglitazone='No', acarbose='No', miglitol='No', troglitazone='No', tolazamide='No', examide='No', citoglipton='No', insulin='No', glyburide-metformin='No', glipizide-metformin='No', glimepiride-pioglitazone='No', metformin-rosiglitazone='No', metformin-pioglitazone='No', change='No', diabetesMed='No', readmitted='No')]" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.sql.types import StringType\n", "from pyspark.sql.functions import udf\n", "\n", "def modify_values(r):\n", " if r == \">30\" or r ==\"<30\":\n", " return \"Yes\"\n", " else:\n", " return \"No\"\n", "ol_val = udf(modify_values, StringType())\n", "diabetes = diabetes.withColumn(\"readmitted\",ol_val(diabetes.readmitted))\n", "diabetes.take(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Calculate the number of missing values per column\n", "\n", "* The line 2 in following code will give us the total number of missing values in each column. Previously our data had \"?\" as missing which wouldn't have been captured using this function. Now that we have converted then to Python None objects, they will be included here" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+------+---+------+-----------------+------------------------+-------------------+----------------+----------+-----------------+------------------+--------------+---------------+-----------------+----------------+----------------+------+------+------+----------------+-------------+---------+---------+-----------+-----------+--------------+-----------+-------------+---------+---------+-----------+------------+-------------+--------+--------+------------+----------+-------+-----------+-------+-------------------+-------------------+------------------------+-----------------------+----------------------+------+-----------+----------+\n", "|race|gender|age|weight|admission_type_id|discharge_disposition_id|admission_source_id|time_in_hospital|payer_code|medical_specialty|num_lab_procedures|num_procedures|num_medications|number_outpatient|number_emergency|number_inpatient|diag_1|diag_2|diag_3|number_diagnoses|max_glu_serum|A1Cresult|metformin|repaglinide|nateglinide|chlorpropamide|glimepiride|acetohexamide|glipizide|glyburide|tolbutamide|pioglitazone|rosiglitazone|acarbose|miglitol|troglitazone|tolazamide|examide|citoglipton|insulin|glyburide-metformin|glipizide-metformin|glimepiride-pioglitazone|metformin-rosiglitazone|metformin-pioglitazone|change|diabetesMed|readmitted|\n", "+----+------+---+------+-----------------+------------------------+-------------------+----------------+----------+-----------------+------------------+--------------+---------------+-----------------+----------------+----------------+------+------+------+----------------+-------------+---------+---------+-----------+-----------+--------------+-----------+-------------+---------+---------+-----------+------------+-------------+--------+--------+------------+----------+-------+-----------+-------+-------------------+-------------------+------------------------+-----------------------+----------------------+------+-----------+----------+\n", "|2273| 0| 0| 98569| 0| 0| 0| 0| 40256| 49949| 0| 0| 0| 0| 0| 0| 21| 358| 1423| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|\n", "+----+------+---+------+-----------------+------------------------+-------------------+----------------+----------+-----------------+------------------+--------------+---------------+-----------------+----------------+----------------+------+------+------+----------------+-------------+---------+---------+-----------+-----------+--------------+-----------+-------------+---------+---------+-----------+------------+-------------+--------+--------+------------+----------+-------+-----------+-------+-------------------+-------------------+------------------------+-----------------------+----------------------+------+-----------+----------+\n", "\n" ] } ], "source": [ "from pyspark.sql.functions import isnan, when, count, col\n", "diabetes.select([count(when(col(c).isNull(),c)).alias(c) for c in diabetes.columns]).show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* We can see columns **'weight', 'payer_code' and 'medical_speciality'** have many missing values\n", "\n", "\n", "\n", "* Lets go ahead and delete these columns" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "scrolled": true }, "outputs": [], "source": [ "diabetes = diabetes.drop('weight', 'payer_code', 'medical_specialty')\n", "#diabetes.take(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Converting a few features which are numeric but have been parsed as Strings due to \"?\"\n", "\n", "\n", "* Dropping rows with NA values.\n", "\n", "\n", "* I did not find an efficient way to do this so I had to hardcode." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "diabetes = diabetes.withColumn(\"diag_2\", diabetes[\"diag_2\"].cast(DoubleType()))\n", "diabetes = diabetes.withColumn(\"diag_3\", diabetes[\"diag_3\"].cast(DoubleType()))\n", "diabetes = diabetes.withColumn(\"diag_1\", diabetes[\"diag_1\"].cast(DoubleType()))\n", "diabetes = diabetes.withColumn(\"time_in_hospital\", diabetes[\"time_in_hospital\"].cast(DoubleType()))\n", "diabetes = diabetes.withColumn(\"num_lab_procedures\", diabetes[\"num_lab_procedures\"].cast(DoubleType()))\n", "diabetes = diabetes.withColumn(\"num_medications\", diabetes[\"num_medications\"].cast(DoubleType()))\n", "diabetes = diabetes.withColumn(\"number_emergency\", diabetes[\"number_emergency\"].cast(DoubleType()))\n", "diabetes = diabetes.withColumn(\"number_inpatient\", diabetes[\"number_inpatient\"].cast(DoubleType()))\n", "diabetes = diabetes.withColumn(\"number_diagnoses\", diabetes[\"number_diagnoses\"].cast(DoubleType()))\n", "\n", "# Dropping the remaining few NA rows\n", "diabetes = diabetes.dropna()\n", "\n", "#diabetes.take(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* I had to import a list of Redundant and unbalanced features from R\n", "\n", "\n", "* Deleting these features" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[race: string, gender: string, age: string, admission_type_id: string, discharge_disposition_id: string, admission_source_id: string, time_in_hospital: double, num_lab_procedures: double, num_procedures: string, num_medications: double, number_outpatient: string, number_emergency: double, number_inpatient: double, diag_1: double, diag_2: double, diag_3: double, number_diagnoses: double, max_glu_serum: string, A1Cresult: string, metformin: string, glimepiride: string, glipizide: string, glyburide: string, pioglitazone: string, rosiglitazone: string, insulin: string, change: string, diabetesMed: string, readmitted: string]" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Redundant and unbalacned feature list imported from R\n", "diabetes = diabetes.drop('examide', 'citoglipton', 'metformin-rosiglitazone', 'metformin-pioglitazone', \n", " 'glimepiride-pioglitazone', 'citoglipton, examide', 'acetohexamide',\n", " 'repaglinide', 'nateglinide', 'chlorpropamide', 'tolbutamide', 'acarbose', 'miglitol',\n", " 'troglitazone', 'tolazamide', 'glyburide-metformin', 'glipizide-metformin')\n", "diabetes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prepping for modelling stage\n", "\n", "* Spark.ML's current functionality deals only with numeric columns\n", "\n", "\n", "* We will use the StringIndexer class to index each categorical column\n", "\n", "\n", "* We will leave the already double columns as is" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[race: string, gender: string, age: string, admission_type_id: string, discharge_disposition_id: string, admission_source_id: string, time_in_hospital: double, num_lab_procedures: double, num_procedures: string, num_medications: double, number_outpatient: string, number_emergency: double, number_inpatient: double, diag_1: double, diag_2: double, diag_3: double, number_diagnoses: double, max_glu_serum: string, A1Cresult: string, metformin: string, glimepiride: string, glipizide: string, glyburide: string, pioglitazone: string, rosiglitazone: string, insulin: string, change: string, diabetesMed: string, readmitted: string, race_index: double, gender_index: double, age_index: double, admission_type_id_index: double, discharge_disposition_id_index: double, admission_source_id_index: double, num_procedures_index: double, number_outpatient_index: double, max_glu_serum_index: double, A1Cresult_index: double, metformin_index: double, glimepiride_index: double, glipizide_index: double, glyburide_index: double, pioglitazone_index: double, rosiglitazone_index: double, insulin_index: double, change_index: double, diabetesMed_index: double, readmitted_index: double]" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.ml import Pipeline\n", "from pyspark.ml.feature import StringIndexer\n", "\n", "# Creating a list of STRING dtype columns\n", "cols_to_index_1 = [x[0] if x[1] == \"string\" else None for x in diabetes.dtypes]\n", "cols_to_index = [x for x in cols_to_index_1 if x != None]\n", "\n", "# Creating the indexers for each column\n", "indexers = [StringIndexer(inputCol=column, outputCol=column+\"_index\") for column in cols_to_index]\n", "indexers_fitted = [ind.fit(diabetes) for ind in indexers]\n", "\n", "# Passing it on to the Pipeline function\n", "# Pipeline transforms our DF using \"stages\" \n", "pipeline = Pipeline(stages=indexers_fitted)\n", "diabetes_indexed = pipeline.fit(diabetes).transform(diabetes)\n", "diabetes_indexed" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Since our indexed columns have a suffix \"_index_\" now, we delete the original columns\n", "\n", "\n", "* Also, renaming \"response_indexed\" to \"response\". Response is \"readmitted\"" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "diabetes_indexed = diabetes_indexed.drop('race',\n", " 'gender',\n", " 'age',\n", " 'admission_type_id',\n", " 'discharge_disposition_id',\n", " 'admission_source_id',\n", " 'num_procedures',\n", " 'number_outpatient',\n", " 'max_glu_serum',\n", " 'A1Cresult',\n", " 'metformin',\n", " 'glimepiride',\n", " 'glipizide',\n", " 'glyburide',\n", " 'pioglitazone',\n", " 'rosiglitazone',\n", " 'insulin',\n", " 'change',\n", " 'diabetesMed',\n", " 'readmitted')\n", "\n", "diabetes_indexed = diabetes_indexed.withColumnRenamed(\"readmitted_index\", \"readmitted\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## ChiSq feature selection.\n", "\n", "* Spark has several different modules for feature selection\n", "\n", "\n", "* I tried to use ChiSq feature selection and was successful\n", "\n", "\n", "* I decided to not include it in my analysis as it needs more work" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [], "source": [ "#from pyspark.ml.linalg import Vectors\n", "#from pyspark.ml.feature import ChiSqSelector\n", "\n", "#def Vectorize(data):\n", "# return data.rdd.map(lambda r: [Vectors.dense(r[0:19]), r[19]]).toDF([\"features\",\"readmitted\"])\n", "\n", "#diabetes_indexed_vectorized = Vectorize(diabetes_indexed)\n", "\n", "#selector = ChiSqSelector(numTopFeatures=7, featuresCol=\"features\",\n", "# outputCol=\"selectedFeatures\", labelCol=\"readmitted\")\n", "\n", "\n", "#diabetes_chsq = selector.fit(diabetes_indexed_vectorized).transform(diabetes_indexed_vectorized)\n", "#diabetes_chsq = diabetes_chsq.drop('features')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### One Hot Encoding\n", "\n", "* Creating a list of features to be OHE" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['race_index',\n", " 'gender_index',\n", " 'age_index',\n", " 'admission_type_id_index',\n", " 'discharge_disposition_id_index',\n", " 'admission_source_id_index',\n", " 'num_procedures_index',\n", " 'number_outpatient_index',\n", " 'max_glu_serum_index',\n", " 'A1Cresult_index',\n", " 'metformin_index',\n", " 'glimepiride_index',\n", " 'glipizide_index',\n", " 'glyburide_index',\n", " 'pioglitazone_index',\n", " 'rosiglitazone_index',\n", " 'insulin_index',\n", " 'change_index',\n", " 'diabetesMed_index']" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "cols_to_ohe = cols_to_index\n", "cols_to_ohe.remove(\"readmitted\")\n", "cols_to_ohe_ = [col+\"_index\" for col in cols_to_ohe]\n", "cols_to_ohe_" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* OHE Code\n", "\n", "* Output of OHE is in the form of sparse vectors. One each for one feature" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(time_in_hospital=3.0, num_lab_procedures=59.0, num_medications=18.0, number_emergency=0.0, number_inpatient=0.0, diag_1=276.0, diag_2=250.01, diag_3=255.0, number_diagnoses=9.0, race_index=0.0, gender_index=0.0, age_index=8.0, admission_type_id_index=0.0, discharge_disposition_id_index=0.0, admission_source_id_index=0.0, num_procedures_index=0.0, number_outpatient_index=0.0, max_glu_serum_index=0.0, A1Cresult_index=0.0, metformin_index=0.0, glimepiride_index=0.0, glipizide_index=0.0, glyburide_index=0.0, pioglitazone_index=0.0, rosiglitazone_index=0.0, insulin_index=3.0, change_index=1.0, diabetesMed_index=0.0, readmitted=1.0, metformin_index_vector=SparseVector(3, {0: 1.0}), admission_source_id_index_vector=SparseVector(15, {0: 1.0}), rosiglitazone_index_vector=SparseVector(3, {0: 1.0}), glimepiride_index_vector=SparseVector(3, {0: 1.0}), discharge_disposition_id_index_vector=SparseVector(25, {0: 1.0}), glipizide_index_vector=SparseVector(3, {0: 1.0}), max_glu_serum_index_vector=SparseVector(3, {0: 1.0}), gender_index_vector=SparseVector(2, {0: 1.0}), number_outpatient_index_vector=SparseVector(38, {0: 1.0}), race_index_vector=SparseVector(4, {0: 1.0}), diabetesMed_index_vector=SparseVector(1, {0: 1.0}), admission_type_id_index_vector=SparseVector(7, {0: 1.0}), A1Cresult_index_vector=SparseVector(3, {0: 1.0}), change_index_vector=SparseVector(1, {}), glyburide_index_vector=SparseVector(3, {0: 1.0}), age_index_vector=SparseVector(9, {8: 1.0}), insulin_index_vector=SparseVector(3, {}), pioglitazone_index_vector=SparseVector(3, {0: 1.0}), num_procedures_index_vector=SparseVector(6, {0: 1.0}))]" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "output_ohe_cols = [x+\"_vector\" for x in cols_to_ohe_]\n", "\n", "from pyspark.ml.feature import OneHotEncoderEstimator\n", "encoder = OneHotEncoderEstimator(inputCols=cols_to_ohe_,\n", " outputCols=output_ohe_cols)\n", "\n", "model_ohe = encoder.fit(diabetes_indexed)\n", "diabetes_ohe = model_ohe.transform(diabetes_indexed)\n", "diabetes_ohe.take(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Dropping original categorical columns. Spark doesn't do that for us" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[time_in_hospital: double, num_lab_procedures: double, num_medications: double, number_emergency: double, number_inpatient: double, diag_1: double, diag_2: double, diag_3: double, number_diagnoses: double, readmitted: double, metformin_index_vector: vector, admission_source_id_index_vector: vector, rosiglitazone_index_vector: vector, glimepiride_index_vector: vector, discharge_disposition_id_index_vector: vector, glipizide_index_vector: vector, max_glu_serum_index_vector: vector, gender_index_vector: vector, number_outpatient_index_vector: vector, race_index_vector: vector, diabetesMed_index_vector: vector, admission_type_id_index_vector: vector, A1Cresult_index_vector: vector, change_index_vector: vector, glyburide_index_vector: vector, age_index_vector: vector, insulin_index_vector: vector, pioglitazone_index_vector: vector, num_procedures_index_vector: vector]" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "diabetes_ohe = diabetes_ohe.drop('race_index',\n", " 'gender_index',\n", " 'age_index',\n", " 'admission_type_id_index',\n", " 'discharge_disposition_id_index',\n", " 'admission_source_id_index',\n", " 'num_procedures_index',\n", " 'number_outpatient_index',\n", " 'max_glu_serum_index',\n", " 'A1Cresult_index',\n", " 'metformin_index',\n", " 'glimepiride_index',\n", " 'glipizide_index',\n", " 'glyburide_index',\n", " 'pioglitazone_index',\n", " 'rosiglitazone_index',\n", " 'insulin_index',\n", " 'change_index',\n", " 'diabetesMed_index')\n", "diabetes_ohe" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------------+------------------+---------------+----------------+----------------+------+------+------+----------------+----------+----------------------+--------------------------------+--------------------------+------------------------+-------------------------------------+----------------------+--------------------------+-------------------+------------------------------+-----------------+------------------------+------------------------------+----------------------+-------------------+----------------------+----------------+--------------------+-------------------------+---------------------------+\n", "|time_in_hospital|num_lab_procedures|num_medications|number_emergency|number_inpatient|diag_1|diag_2|diag_3|number_diagnoses|readmitted|metformin_index_vector|admission_source_id_index_vector|rosiglitazone_index_vector|glimepiride_index_vector|discharge_disposition_id_index_vector|glipizide_index_vector|max_glu_serum_index_vector|gender_index_vector|number_outpatient_index_vector|race_index_vector|diabetesMed_index_vector|admission_type_id_index_vector|A1Cresult_index_vector|change_index_vector|glyburide_index_vector|age_index_vector|insulin_index_vector|pioglitazone_index_vector|num_procedures_index_vector|\n", "+----------------+------------------+---------------+----------------+----------------+------+------+------+----------------+----------+----------------------+--------------------------------+--------------------------+------------------------+-------------------------------------+----------------------+--------------------------+-------------------+------------------------------+-----------------+------------------------+------------------------------+----------------------+-------------------+----------------------+----------------+--------------------+-------------------------+---------------------------+\n", "| 3.0| 59.0| 18.0| 0.0| 0.0| 276.0|250.01| 255.0| 9.0| 1.0| (3,[0],[1.0])| (15,[0],[1.0])| (3,[0],[1.0])| (3,[0],[1.0])| (25,[0],[1.0])| (3,[0],[1.0])| (3,[0],[1.0])| (2,[0],[1.0])| (38,[0],[1.0])| (4,[0],[1.0])| (1,[0],[1.0])| (7,[0],[1.0])| (3,[0],[1.0])| (1,[],[])| (3,[0],[1.0])| (9,[8],[1.0])| (3,[],[])| (3,[0],[1.0])| (6,[0],[1.0])|\n", "+----------------+------------------+---------------+----------------+----------------+------+------+------+----------------+----------+----------------------+--------------------------------+--------------------------+------------------------+-------------------------------------+----------------------+--------------------------+-------------------+------------------------------+-----------------+------------------------+------------------------------+----------------------+-------------------+----------------------+----------------+--------------------+-------------------------+---------------------------+\n", "only showing top 1 row\n", "\n" ] } ], "source": [ "diabetes_ohe.show(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Its a bit difficult to see but each column has either a double value or a sparkse vector value.\n", "\n", "\n", "* We need to assemble these features together in a way which Spark.ml algorithms require the features to be.\n", "\n", "\n", "* We will use VectorAssembler to do that" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+--------------------+\n", "|readmitted| features|\n", "+----------+--------------------+\n", "| 1.0|(144,[0,1,2,5,6,7...|\n", "| 0.0|(144,[0,1,2,5,6,7...|\n", "| 0.0|(144,[0,1,2,5,6,7...|\n", "| 1.0|(144,[0,1,2,5,6,7...|\n", "| 1.0|(144,[0,1,2,5,6,7...|\n", "+----------+--------------------+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "# The following code will take in all of these columns and convert it to 1 column named \"features\" which will store data of\n", "# ALL features for 1 record (row)\n", "\n", "from pyspark.ml.feature import VectorAssembler\n", "\n", "assembler = VectorAssembler(\n", " inputCols=['time_in_hospital',\n", " 'num_lab_procedures',\n", " 'num_medications',\n", " 'number_emergency',\n", " 'number_inpatient',\n", " 'diag_1',\n", " 'diag_2',\n", " 'diag_3',\n", " 'number_diagnoses',\n", " 'metformin_index_vector',\n", " 'admission_source_id_index_vector',\n", " 'rosiglitazone_index_vector',\n", " 'glimepiride_index_vector',\n", " 'discharge_disposition_id_index_vector',\n", " 'glipizide_index_vector',\n", " 'max_glu_serum_index_vector',\n", " 'gender_index_vector',\n", " 'number_outpatient_index_vector',\n", " 'race_index_vector',\n", " 'diabetesMed_index_vector',\n", " 'admission_type_id_index_vector',\n", " 'A1Cresult_index_vector',\n", " 'change_index_vector',\n", " 'glyburide_index_vector',\n", " 'age_index_vector',\n", " 'insulin_index_vector',\n", " 'pioglitazone_index_vector',\n", " 'num_procedures_index_vector'],\n", " outputCol=\"features\")\n", "\n", "output = assembler.transform(diabetes_ohe)\n", "\n", "output = output.drop('time_in_hospital',\n", " 'num_lab_procedures',\n", " 'num_medications',\n", " 'number_emergency',\n", " 'number_inpatient',\n", " 'diag_1',\n", " 'diag_2',\n", " 'diag_3',\n", " 'number_diagnoses',\n", " 'metformin_index_vector',\n", " 'admission_source_id_index_vector',\n", " 'rosiglitazone_index_vector',\n", " 'glimepiride_index_vector',\n", " 'discharge_disposition_id_index_vector',\n", " 'glipizide_index_vector',\n", " 'max_glu_serum_index_vector',\n", " 'gender_index_vector',\n", " 'number_outpatient_index_vector',\n", " 'race_index_vector',\n", " 'diabetesMed_index_vector',\n", " 'admission_type_id_index_vector',\n", " 'A1Cresult_index_vector',\n", " 'change_index_vector',\n", " 'glyburide_index_vector',\n", " 'age_index_vector',\n", " 'insulin_index_vector',\n", " 'pioglitazone_index_vector',\n", " 'num_procedures_index_vector')\n", "\n", "output.show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### RANDOM FOREST\n", "\n", "* Training RF with a training, testing split of 80-20%\n", "\n", "* Testing the algorithm on test set and print the Accuracy\n", "\n", "* **Accuracy = 63.42%**" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Accuracy = 0.634299\n" ] } ], "source": [ "from pyspark.ml.classification import RandomForestClassifier\n", "from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n", "\n", "training, testing = output.randomSplit([0.8,0.2])\n", "rf = RandomForestClassifier(numTrees=100, maxDepth=6, labelCol=\"readmitted\", seed=42,\n", " featureSubsetStrategy='onethird')\n", "model = rf.fit(training)\n", "\n", "predictions = model.transform(testing)\n", "\n", "# Select (prediction, true label) and compute test error\n", "evaluator = MulticlassClassificationEvaluator(\n", " labelCol=\"readmitted\", predictionCol=\"prediction\", metricName=\"accuracy\")\n", "\n", "accuracy = evaluator.evaluate(predictions)\n", "print(\"Accuracy = %g\" % accuracy)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Logistic Regression\n", "\n", "* Training Logistic regression with Elastic Net Regularization\n", "\n", "* **Accuracy = 53.9%**" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Accuracy = 0.539396\n" ] } ], "source": [ "from pyspark.ml.classification import LogisticRegression\n", "lr = LogisticRegression(maxIter=100, regParam=0.3, elasticNetParam=0.8, labelCol=\"readmitted\", \n", " featuresCol=\"features\")\n", "lrModel = lr.fit(training)\n", "\n", "preds_lr = lrModel.transform(testing)\n", "\n", "# Select (prediction, true label) and compute test error\n", "evaluator = MulticlassClassificationEvaluator(\n", " labelCol=\"readmitted\", predictionCol=\"prediction\", metricName=\"accuracy\")\n", "\n", "accuracy = evaluator.evaluate(preds_lr)\n", "print(\"Accuracy = %g\" % accuracy)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" } }, "nbformat": 4, "nbformat_minor": 2 }