{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Classification in PySpark\n", "> Now that you are familiar with getting data into Spark, you'll move onto building two types of classification model - Decision Trees and Logistic Regression. You'll also find out about a few approaches to data preparation. This is the Summary of lecture \"Machine Learning with PySpark\n", "\", via datacamp.\n", "\n", "- toc: true \n", "- badges: true\n", "- comments: true\n", "- author: Chanseok Kang\n", "- categories: [Python, Datacamp, PySpark, Machine_Learning]\n", "- image: " ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import pyspark\n", "import numpy as np\n", "import pandas as pd" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data Preparation" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Removing columns and rows\n", "You previously loaded airline flight data from a CSV file. You're going to develop a model which will predict whether or not a given flight will be delayed.\n", "\n", "In this exercise you need to trim those data down by:\n", "\n", "1. removing an uninformative column and\n", "2. removing rows which do not have information about whether or not a flight was delayed." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+---+-------+------+---+----+------+--------+-----+\n", "|mon|dom|dow|carrier|flight|org|mile|depart|duration|delay|\n", "+---+---+---+-------+------+---+----+------+--------+-----+\n", "| 10| 10| 1| OO| 5836|ORD| 157| 8.18| 51| 27|\n", "| 1| 4| 1| OO| 5866|ORD| 466| 15.5| 102| null|\n", "| 11| 22| 1| OO| 6016|ORD| 738| 7.17| 127| -19|\n", "| 2| 14| 5| B6| 199|JFK|2248| 21.17| 365| 60|\n", "| 5| 25| 3| WN| 1675|SJC| 386| 12.92| 85| 22|\n", "+---+---+---+-------+------+---+----+------+--------+-----+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.builder.master('local[*]').appName('flights').getOrCreate()\n", "\n", "# Read data from CSV file\n", "flights = spark.read.csv('./dataset/flights-larger.csv', sep=',', header=True, inferSchema=True,\n", " nullValue='NA')\n", "\n", "flights.show(5)" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "258289\n" ] } ], "source": [ "# Remove the 'flight' column\n", "flights_drop_column = flights.drop('flight')\n", "\n", "# Number of records with missing 'delay' values\n", "flights_drop_column.filter('delay IS NULL').count()\n", "\n", "# Remove records with missing 'delay' values\n", "flights_valid_delay = flights_drop_column.filter('delay IS NOT NULL')\n", "\n", "# Remove records with missing values in any column and get the number of remaining rows\n", "flights_none_missing = flights_valid_delay.dropna()\n", "print(flights_none_missing.count())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Column manipulation\n", "The Federal Aviation Administration (FAA) considers a flight to be \"delayed\" when it arrives 15 minutes or more after its scheduled time.\n", "\n", "The next step of preparing the flight data has two parts:\n", "\n", "1. convert the units of distance, replacing the `mile` column with a `kmcolumn`; and\n", "2. create a Boolean column indicating whether or not a flight was delayed." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+---+-------+---+------+--------+-----+------+-----+\n", "|mon|dom|dow|carrier|org|depart|duration|delay| km|label|\n", "+---+---+---+-------+---+------+--------+-----+------+-----+\n", "| 10| 10| 1| OO|ORD| 8.18| 51| 27| 253.0| 1|\n", "| 11| 22| 1| OO|ORD| 7.17| 127| -19|1188.0| 0|\n", "| 2| 14| 5| B6|JFK| 21.17| 365| 60|3618.0| 1|\n", "| 5| 25| 3| WN|SJC| 12.92| 85| 22| 621.0| 1|\n", "| 3| 28| 1| B6|LGA| 13.33| 182| 70|1732.0| 1|\n", "+---+---+---+-------+---+------+--------+-----+------+-----+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "from pyspark.sql.functions import round\n", "\n", "# Convert 'mile' to 'km' and drop 'mile' column\n", "flights_km = flights_none_missing.withColumn('km', round(flights_none_missing.mile * 1.60934, 0)).drop('mile')\n", "\n", "# Create 'label' column indicating whether flight delayed (1) or not(0)\n", "flights_km = flights_km.withColumn('label', (flights_km.delay >= 15).cast('integer'))\n", "\n", "# Check first five records\n", "flights_km.show(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Categorical columns\n", "In the flights data there are two columns, `carrier` and `org`, which hold categorical data. You need to transform those columns into indexed numerical values.\n", "\n" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "from pyspark.ml.feature import StringIndexer\n", "\n", "# Create an indexer\n", "indexer = StringIndexer(inputCol='carrier', outputCol='carrier_idx')\n", "\n", "# Indexer identifies categories in the data\n", "indexer_model = indexer.fit(flights_km)\n", "\n", "# Indexer creates a new column with numeric index values\n", "flights_indexed = indexer_model.transform(flights_km)\n", "\n", "# Repeat the process for the other categorical feature\n", "flights_indexed = StringIndexer(inputCol='org', outputCol='org_idx').fit(flights_indexed).transform(flights_indexed)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Assembling columns\n", "The final stage of data preparation is to consolidate all of the predictor columns into a single column.\n", "\n", "At present our data has the following predictor columns:\n", "\n", "- `mon`, `dom` and `dow`\n", "- `carrier_idx` (derived from `carrier`)\n", "- `org_idx` (derived from `org`)\n", "- `km`\n", "- `depart`\n", "- `duration`" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----------------------------------------+-----+\n", "|features |delay|\n", "+-----------------------------------------+-----+\n", "|[10.0,10.0,1.0,2.0,0.0,253.0,8.18,51.0] |27 |\n", "|[11.0,22.0,1.0,2.0,0.0,1188.0,7.17,127.0]|-19 |\n", "|[2.0,14.0,5.0,4.0,2.0,3618.0,21.17,365.0]|60 |\n", "|[5.0,25.0,3.0,3.0,5.0,621.0,12.92,85.0] |22 |\n", "|[3.0,28.0,1.0,4.0,3.0,1732.0,13.33,182.0]|70 |\n", "+-----------------------------------------+-----+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "from pyspark.ml.feature import VectorAssembler\n", "\n", "# Create an assembler object\n", "assembler = VectorAssembler(inputCols=[\n", " 'mon', 'dom', 'dow',\n", " 'carrier_idx', \n", " 'org_idx',\n", " 'km', 'depart', 'duration'\n", "], outputCol='features')\n", "\n", "# Consolidate predictor columns\n", "flights_assembled = assembler.transform(flights_indexed)\n", "\n", "# Check the resulting column\n", "flights_assembled.select('features', 'delay').show(5, truncate=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Decision Tree\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Train/test split\n", "To objectively assess a Machine Learning model you need to be able to test it on an independent set of data. You can't use the same data that you used to train the model: of course the model will perform (relatively) well on those data!\n", "\n", "You will split the data into two components:\n", "\n", "- training data (used to train the model) and\n", "- testing data (used to test the model)." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0.7996856234682855\n" ] } ], "source": [ "# Split into training and test sets in a 80:20 ratio\n", "flights_train, flights_test = flights_assembled.randomSplit([0.8, 0.2], seed=17)\n", "\n", "# Check that training set has around 80% of records\n", "training_ratio = flights_train.count() / flights_assembled.count()\n", "print(training_ratio)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Build a Decision Tree\n", "Now that you've split the flights data into training and testing sets, you can use the training set to fit a Decision Tree model." ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+----------+----------------------------------------+\n", "|label|prediction|probability |\n", "+-----+----------+----------------------------------------+\n", "|1 |0.0 |[0.6282628262826283,0.37173717371737175]|\n", "|1 |0.0 |[0.6282628262826283,0.37173717371737175]|\n", "|0 |1.0 |[0.3195001440187889,0.6804998559812111] |\n", "|1 |1.0 |[0.3195001440187889,0.6804998559812111] |\n", "|0 |1.0 |[0.37094017094017095,0.629059829059829] |\n", "+-----+----------+----------------------------------------+\n", "only showing top 5 rows\n", "\n" ] } ], "source": [ "from pyspark.ml.classification import DecisionTreeClassifier\n", "\n", "# Create a classifier object and fit to the training data\n", "tree = DecisionTreeClassifier()\n", "tree_model = tree.fit(flights_train)\n", "\n", "# Create predictions for the testing data and take a look at the predictions\n", "prediction = tree_model.transform(flights_test)\n", "prediction.select('label', 'prediction', 'probability').show(5, False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Evaluate the Decision Tree\n", "You can assess the quality of your model by evaluating how well it performs on the testing data. Because the model was not trained on these data, this represents an objective assessment of the model.\n", "\n", "A confusion matrix gives a useful breakdown of predictions versus known values. It has four cells which represent the counts of:\n", "\n", "- True Negatives (TN) — model predicts negative outcome & known outcome is negative\n", "- True Positives (TP) — model predicts positive outcome & known outcome is positive\n", "- False Negatives (FN) — model predicts negative outcome but known outcome is positive\n", "- False Positives (FP) — model predicts positive outcome but known outcome is negative." ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+----------+-----+\n", "|label|prediction|count|\n", "+-----+----------+-----+\n", "| 1| 0.0| 9472|\n", "| 0| 0.0|16371|\n", "| 1| 1.0|16643|\n", "| 0| 1.0| 9253|\n", "+-----+----------+-----+\n", "\n", "0.6380873229092174\n" ] } ], "source": [ "# Create a confusion matrix\n", "prediction.groupBy('label', 'prediction').count().show()\n", "\n", "# Calculate the elements of the confusion matrix\n", "TN = prediction.filter('prediction = 0 AND label = prediction').count()\n", "TP = prediction.filter('prediction = 1 AND label = prediction').count()\n", "FN = prediction.filter('prediction = 0 AND label = 1').count()\n", "FP = prediction.filter('prediction = 1 AND label = 0').count()\n", "\n", "# Accuracy measures the proportion of correct predictions\n", "accuracy = (TN + TP) / (TN + TP + FN + FP)\n", "print(accuracy)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Logistic Regression\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Build a Logistic Regression model\n", "You've already built a Decision Tree model using the flights data. Now you're going to create a Logistic Regression model on the same data.\n", "\n", "The objective is to predict whether a flight is likely to be delayed by at least 15 minutes (label 1) or not (label 0).\n", "\n", "Although you have a variety of predictors at your disposal, you'll only use the `mon`, `depart` and `duration` columns for the moment. These are numerical features which can immediately be used for a Logistic Regression model. You'll need to do a little more work before you can include categorical features." ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+----------+-----+\n", "|label|prediction|count|\n", "+-----+----------+-----+\n", "| 1| 0.0| 9455|\n", "| 0| 0.0|14931|\n", "| 1| 1.0|16660|\n", "| 0| 1.0|10693|\n", "+-----+----------+-----+\n", "\n" ] } ], "source": [ "from pyspark.ml.classification import LogisticRegression\n", "\n", "# Selecting numeric columns\n", "flights_train_num = flights_train.select(\"mon\", 'depart', 'duration', 'features', 'label')\n", "flights_test_num = flights_test.select(\"mon\", \"depart\", \"duration\", 'features', 'label')\n", "\n", "# Create classifier object and train on training data\n", "logistic = LogisticRegression().fit(flights_train_num)\n", "\n", "# Create a predictions for the test data and show confusion matrix\n", "prediction = logistic.transform(flights_test_num)\n", "prediction.groupBy(\"label\", \"prediction\").count().show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Evaluate the Logistic Regression model\n", "Accuracy is generally not a very reliable metric because it can be biased by the most common target class.\n", "\n", "There are two other useful metrics:\n", "\n", "- precision and\n", "- recall.\n", "\n", "Precision is the proportion of positive predictions which are correct. For all flights which are predicted to be delayed, what proportion is actually delayed?\n", "\n", "Recall is the proportion of positives outcomes which are correctly predicted. For all delayed flights, what proportion is correctly predicted by the model?\n", "\n", "The precision and recall are generally formulated in terms of the positive target class. But it's also possible to calculate weighted versions of these metrics which look at both target classes." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "# Calculate the elements of the confusion matrix\n", "TN = prediction.filter('prediction = 0 AND label = prediction').count()\n", "TP = prediction.filter('prediction = 1 AND label = prediction').count()\n", "FN = prediction.filter('prediction = 0 AND label = 1').count()\n", "FP = prediction.filter('prediction = 1 AND label = 0').count()" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "precision = 0.61\n", "recall = 0.64\n" ] } ], "source": [ "from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator\n", "\n", "# Calculate precision and recall\n", "precision = TP / (TP + FP)\n", "recall = TP / (TP + FN)\n", "print('precision = {:.2f}\\nrecall = {:.2f}'.format(precision, recall))\n", "\n", "# Find weighted precision\n", "multi_evaluator = MulticlassClassificationEvaluator()\n", "weighted_precision = multi_evaluator.evaluate(prediction, {multi_evaluator.metricName: \"weightedPrecision\"})\n", "\n", "# Find AUC\n", "binary_evaluator = BinaryClassificationEvaluator()\n", "auc = binary_evaluator.evaluate(prediction, {binary_evaluator.metricName: \"areaUnderROC\"})" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0.6106605467579678\n", "0.6504917093209514\n" ] } ], "source": [ "print(weighted_precision)\n", "print(auc)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Turning Text into Tables\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Punctuation, numbers and tokens\n", "At the end of the previous chapter you loaded a dataset of SMS messages which had been labeled as either \"spam\" (label 1) or \"ham\" (label 0). You're now going to use those data to build a classifier model.\n", "\n", "But first you'll need to prepare the SMS messages as follows:\n", "\n", "- remove punctuation and numbers\n", "- tokenize (split into individual words)\n", "- remove stop words\n", "- apply the hashing trick\n", "- convert to TF-IDF representation.\n", "\n", "In this exercise you'll remove punctuation and numbers, then tokenize the messages." ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql.types import StructType, StructField, IntegerType, StringType\n", "\n", "# Specify column names and types\n", "schema = StructType([\n", " StructField(\"id\", IntegerType()),\n", " StructField(\"text\", StringType()),\n", " StructField(\"label\", IntegerType())\n", "])\n", "\n", "# Load data from a delimited file\n", "sms = spark.read.csv('./dataset/sms.csv', sep=';', header=False, schema=schema)" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----------------------------------+-----+------------------------------------------+\n", "|id |text |label|words |\n", "+---+----------------------------------+-----+------------------------------------------+\n", "|1 |Sorry I'll call later in meeting |0 |[sorry, i'll, call, later, in, meeting] |\n", "|2 |Dont worry I guess he's busy |0 |[dont, worry, i, guess, he's, busy] |\n", "|3 |Call FREEPHONE now |1 |[call, freephone, now] |\n", "|4 |Win a cash prize or a prize worth |1 |[win, a, cash, prize, or, a, prize, worth]|\n", "+---+----------------------------------+-----+------------------------------------------+\n", "only showing top 4 rows\n", "\n" ] } ], "source": [ "from pyspark.sql.functions import regexp_replace\n", "from pyspark.ml.feature import Tokenizer\n", "\n", "# Remove punctuation (REGEX provided) and numbers\n", "wrangled = sms.withColumn('text', regexp_replace(sms.text, '[_():;,.!?\\\\-]', ' '))\n", "wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, '[0-9]', ' '))\n", "\n", "# Merge multiple spaces\n", "wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, ' +', ' '))\n", "\n", "# Split the text into words\n", "wrangled = Tokenizer(inputCol='text', outputCol='words').transform(wrangled)\n", "\n", "wrangled.show(4, truncate=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Stop words and hashing\n", "The next steps will be to remove stop words and then apply the hashing trick, converting the results into a TF-IDF.\n", "\n", "A quick reminder about these concepts:\n", "\n", "- The hashing trick provides a fast and space-efficient way to map a very large (possibly infinite) set of items (in this case, all words contained in the SMS messages) onto a smaller, finite number of values.\n", "- The TF-IDF matrix reflects how important a word is to each document. It takes into account both the frequency of the word within each document but also the frequency of the word across all of the documents in the collection." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------------------------------+----------------------------------------------------------------------------------------------------+\n", "|terms |features |\n", "+--------------------------------+----------------------------------------------------------------------------------------------------+\n", "|[sorry, call, later, meeting] |(1024,[138,384,577,996],[2.273418200008753,3.6288353225642043,3.5890949939146903,4.104259019279279])|\n", "|[dont, worry, guess, busy] |(1024,[215,233,276,329],[3.9913186080986836,3.3790235241678332,4.734227298217693,4.58299632849377]) |\n", "|[call, freephone] |(1024,[133,138],[5.367951058306837,2.273418200008753]) |\n", "|[win, cash, prize, prize, worth]|(1024,[31,47,62,389],[3.6632029660684124,4.754846585420428,4.072170704727778,7.064594791043114]) |\n", "+--------------------------------+----------------------------------------------------------------------------------------------------+\n", "only showing top 4 rows\n", "\n" ] } ], "source": [ "from pyspark.ml.feature import StopWordsRemover, HashingTF, IDF\n", "\n", "sms = wrangled.select('id', 'words', 'label')\n", "\n", "# Remove stop words.\n", "wrangled = StopWordsRemover(inputCol='words', outputCol='terms').transform(sms)\n", "\n", "# Apply the hashing trick\n", "wrangled = HashingTF(inputCol='terms', outputCol='hash', numFeatures=1024).transform(wrangled)\n", "\n", "# Convert hashed symbols to TF-IDF\n", "tf_idf = IDF(inputCol='hash', outputCol='features').fit(wrangled).transform(wrangled)\n", "\n", "tf_idf.select('terms', 'features').show(4, truncate=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Training a spam classifier\n", "The SMS data have now been prepared for building a classifier. Specifically, this is what you have done:\n", "\n", "- removed numbers and punctuation\n", "- split the messages into words (or \"tokens\")\n", "- removed stop words\n", "- applied the hashing trick and\n", "- converted to a TF-IDF representation.\n", "\n", "Next you'll need to split the TF-IDF data into training and testing sets. Then you'll use the training data to fit a Logistic Regression model and finally evaluate the performance of that model on the testing data." ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+----------+-----+\n", "|label|prediction|count|\n", "+-----+----------+-----+\n", "| 1| 0.0| 39|\n", "| 0| 0.0| 932|\n", "| 1| 1.0| 121|\n", "| 0| 1.0| 4|\n", "+-----+----------+-----+\n", "\n" ] } ], "source": [ "sms = tf_idf.select('label', 'features')\n", "\n", "# Split the data into training and test sets\n", "sms_train, sms_test = sms.randomSplit([0.8, 0.2], seed=13)\n", "\n", "# Fit a Logistic Regression model to the training data\n", "logistic = LogisticRegression(regParam=0.2).fit(sms_train)\n", "\n", "# Make predictions on the test data\n", "prediction = logistic.transform(sms_test)\n", "\n", "# Create a confusion matrix, comparing predictions to known labels\n", "prediction.groupBy('label', 'prediction').count().show()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 4 }