{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import os.path\n", "import urllib.request\n", "import gzip\n", "import shutil\n", "\n", "if not os.path.exists('winequality-red.csv'):\n", " urllib.request.urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv', 'winequality-red.csv')\n", "if not os.path.exists('winequality-white.csv'):\n", " urllib.request.urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-white.csv', 'winequality-white.csv')\n", "if not os.path.exists('winequality.names'):\n", " urllib.request.urlretrieve('https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality.names', 'winequality.names')\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "from pyspark.sql import SparkSession\n", "from pyspark.sql.functions import col\n", "\n", "spark = SparkSession.builder.appName('wine-quality').getOrCreate()" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- fixed acidity: double (nullable = true)\n", " |-- volatile acidity: double (nullable = true)\n", " |-- citric acid: double (nullable = true)\n", " |-- residual sugar: double (nullable = true)\n", " |-- chlorides: double (nullable = true)\n", " |-- free sulfur dioxide: double (nullable = true)\n", " |-- total sulfur dioxide: double (nullable = true)\n", " |-- density: double (nullable = true)\n", " |-- pH: double (nullable = true)\n", " |-- sulphates: double (nullable = true)\n", " |-- alcohol: double (nullable = true)\n", " |-- quality: integer (nullable = true)\n", "\n" ] }, { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>fixed acidity</th>\n", " <th>volatile acidity</th>\n", " <th>citric acid</th>\n", " <th>residual sugar</th>\n", " <th>chlorides</th>\n", " <th>free sulfur dioxide</th>\n", " <th>total sulfur dioxide</th>\n", " <th>density</th>\n", " <th>pH</th>\n", " <th>sulphates</th>\n", " <th>alcohol</th>\n", " <th>quality</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>7.4</td>\n", " <td>0.700</td>\n", " <td>0.00</td>\n", " <td>1.9</td>\n", " <td>0.076</td>\n", " <td>11.0</td>\n", " <td>34.0</td>\n", " <td>0.9978</td>\n", " <td>3.51</td>\n", " <td>0.56</td>\n", " <td>9.4</td>\n", " <td>5</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>7.8</td>\n", " <td>0.880</td>\n", " <td>0.00</td>\n", " <td>2.6</td>\n", " <td>0.098</td>\n", " <td>25.0</td>\n", " <td>67.0</td>\n", " <td>0.9968</td>\n", " <td>3.20</td>\n", " <td>0.68</td>\n", " <td>9.8</td>\n", " <td>5</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>7.8</td>\n", " <td>0.760</td>\n", " <td>0.04</td>\n", " <td>2.3</td>\n", " <td>0.092</td>\n", " <td>15.0</td>\n", " <td>54.0</td>\n", " <td>0.9970</td>\n", " <td>3.26</td>\n", " <td>0.65</td>\n", " <td>9.8</td>\n", " <td>5</td>\n", " </tr>\n", " <tr>\n", " <th>3</th>\n", " <td>11.2</td>\n", " <td>0.280</td>\n", " <td>0.56</td>\n", " <td>1.9</td>\n", " <td>0.075</td>\n", " <td>17.0</td>\n", " <td>60.0</td>\n", " <td>0.9980</td>\n", " <td>3.16</td>\n", " <td>0.58</td>\n", " <td>9.8</td>\n", " <td>6</td>\n", " </tr>\n", " <tr>\n", " <th>4</th>\n", " <td>7.4</td>\n", " <td>0.700</td>\n", " <td>0.00</td>\n", " <td>1.9</td>\n", " <td>0.076</td>\n", " <td>11.0</td>\n", " <td>34.0</td>\n", " <td>0.9978</td>\n", " <td>3.51</td>\n", " <td>0.56</td>\n", " <td>9.4</td>\n", " <td>5</td>\n", " </tr>\n", " <tr>\n", " <th>5</th>\n", " <td>7.4</td>\n", " <td>0.660</td>\n", " <td>0.00</td>\n", " <td>1.8</td>\n", " <td>0.075</td>\n", " <td>13.0</td>\n", " <td>40.0</td>\n", " <td>0.9978</td>\n", " <td>3.51</td>\n", " <td>0.56</td>\n", " <td>9.4</td>\n", " <td>5</td>\n", " </tr>\n", " <tr>\n", " <th>6</th>\n", " <td>7.9</td>\n", " <td>0.600</td>\n", " <td>0.06</td>\n", " <td>1.6</td>\n", " <td>0.069</td>\n", " <td>15.0</td>\n", " <td>59.0</td>\n", " <td>0.9964</td>\n", " <td>3.30</td>\n", " <td>0.46</td>\n", " <td>9.4</td>\n", " <td>5</td>\n", " </tr>\n", " <tr>\n", " <th>7</th>\n", " <td>7.3</td>\n", " <td>0.650</td>\n", " <td>0.00</td>\n", " <td>1.2</td>\n", " <td>0.065</td>\n", " <td>15.0</td>\n", " <td>21.0</td>\n", " <td>0.9946</td>\n", " <td>3.39</td>\n", " <td>0.47</td>\n", " <td>10.0</td>\n", " <td>7</td>\n", " </tr>\n", " <tr>\n", " <th>8</th>\n", " <td>7.8</td>\n", " <td>0.580</td>\n", " <td>0.02</td>\n", " <td>2.0</td>\n", " <td>0.073</td>\n", " <td>9.0</td>\n", " <td>18.0</td>\n", " <td>0.9968</td>\n", " <td>3.36</td>\n", " <td>0.57</td>\n", " <td>9.5</td>\n", " <td>7</td>\n", " </tr>\n", " <tr>\n", " <th>9</th>\n", " <td>7.5</td>\n", " <td>0.500</td>\n", " <td>0.36</td>\n", " <td>6.1</td>\n", " <td>0.071</td>\n", " <td>17.0</td>\n", " <td>102.0</td>\n", " <td>0.9978</td>\n", " <td>3.35</td>\n", " <td>0.80</td>\n", " <td>10.5</td>\n", " <td>5</td>\n", " </tr>\n", " <tr>\n", " <th>10</th>\n", " <td>6.7</td>\n", " <td>0.580</td>\n", " <td>0.08</td>\n", " <td>1.8</td>\n", " <td>0.097</td>\n", " <td>15.0</td>\n", " <td>65.0</td>\n", " <td>0.9959</td>\n", " <td>3.28</td>\n", " <td>0.54</td>\n", " <td>9.2</td>\n", " <td>5</td>\n", " </tr>\n", " <tr>\n", " <th>11</th>\n", " <td>7.5</td>\n", " <td>0.500</td>\n", " <td>0.36</td>\n", " <td>6.1</td>\n", " <td>0.071</td>\n", " <td>17.0</td>\n", " <td>102.0</td>\n", " <td>0.9978</td>\n", " <td>3.35</td>\n", " <td>0.80</td>\n", " <td>10.5</td>\n", " <td>5</td>\n", " </tr>\n", " <tr>\n", " <th>12</th>\n", " <td>5.6</td>\n", " <td>0.615</td>\n", " <td>0.00</td>\n", " <td>1.6</td>\n", " <td>0.089</td>\n", " <td>16.0</td>\n", " <td>59.0</td>\n", " <td>0.9943</td>\n", " <td>3.58</td>\n", " <td>0.52</td>\n", " <td>9.9</td>\n", " <td>5</td>\n", " </tr>\n", " <tr>\n", " <th>13</th>\n", " <td>7.8</td>\n", " <td>0.610</td>\n", " <td>0.29</td>\n", " <td>1.6</td>\n", " <td>0.114</td>\n", " <td>9.0</td>\n", " <td>29.0</td>\n", " <td>0.9974</td>\n", " <td>3.26</td>\n", " <td>1.56</td>\n", " <td>9.1</td>\n", " <td>5</td>\n", " </tr>\n", " <tr>\n", " <th>14</th>\n", " <td>8.9</td>\n", " <td>0.620</td>\n", " <td>0.18</td>\n", " <td>3.8</td>\n", " <td>0.176</td>\n", " <td>52.0</td>\n", " <td>145.0</td>\n", " <td>0.9986</td>\n", " <td>3.16</td>\n", " <td>0.88</td>\n", " <td>9.2</td>\n", " <td>5</td>\n", " </tr>\n", " <tr>\n", " <th>15</th>\n", " <td>8.9</td>\n", " <td>0.620</td>\n", " <td>0.19</td>\n", " <td>3.9</td>\n", " <td>0.170</td>\n", " <td>51.0</td>\n", " <td>148.0</td>\n", " <td>0.9986</td>\n", " <td>3.17</td>\n", " <td>0.93</td>\n", " <td>9.2</td>\n", " <td>5</td>\n", " </tr>\n", " <tr>\n", " <th>16</th>\n", " <td>8.5</td>\n", " <td>0.280</td>\n", " <td>0.56</td>\n", " <td>1.8</td>\n", " <td>0.092</td>\n", " <td>35.0</td>\n", " <td>103.0</td>\n", " <td>0.9969</td>\n", " <td>3.30</td>\n", " <td>0.75</td>\n", " <td>10.5</td>\n", " <td>7</td>\n", " </tr>\n", " <tr>\n", " <th>17</th>\n", " <td>8.1</td>\n", " <td>0.560</td>\n", " <td>0.28</td>\n", " <td>1.7</td>\n", " <td>0.368</td>\n", " <td>16.0</td>\n", " <td>56.0</td>\n", " <td>0.9968</td>\n", " <td>3.11</td>\n", " <td>1.28</td>\n", " <td>9.3</td>\n", " <td>5</td>\n", " </tr>\n", " <tr>\n", " <th>18</th>\n", " <td>7.4</td>\n", " <td>0.590</td>\n", " <td>0.08</td>\n", " <td>4.4</td>\n", " <td>0.086</td>\n", " <td>6.0</td>\n", " <td>29.0</td>\n", " <td>0.9974</td>\n", " <td>3.38</td>\n", " <td>0.50</td>\n", " <td>9.0</td>\n", " <td>4</td>\n", " </tr>\n", " <tr>\n", " <th>19</th>\n", " <td>7.9</td>\n", " <td>0.320</td>\n", " <td>0.51</td>\n", " <td>1.8</td>\n", " <td>0.341</td>\n", " <td>17.0</td>\n", " <td>56.0</td>\n", " <td>0.9969</td>\n", " <td>3.04</td>\n", " <td>1.08</td>\n", " <td>9.2</td>\n", " <td>6</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " fixed acidity volatile acidity citric acid residual sugar chlorides \\\n", "0 7.4 0.700 0.00 1.9 0.076 \n", "1 7.8 0.880 0.00 2.6 0.098 \n", "2 7.8 0.760 0.04 2.3 0.092 \n", "3 11.2 0.280 0.56 1.9 0.075 \n", "4 7.4 0.700 0.00 1.9 0.076 \n", "5 7.4 0.660 0.00 1.8 0.075 \n", "6 7.9 0.600 0.06 1.6 0.069 \n", "7 7.3 0.650 0.00 1.2 0.065 \n", "8 7.8 0.580 0.02 2.0 0.073 \n", "9 7.5 0.500 0.36 6.1 0.071 \n", "10 6.7 0.580 0.08 1.8 0.097 \n", "11 7.5 0.500 0.36 6.1 0.071 \n", "12 5.6 0.615 0.00 1.6 0.089 \n", "13 7.8 0.610 0.29 1.6 0.114 \n", "14 8.9 0.620 0.18 3.8 0.176 \n", "15 8.9 0.620 0.19 3.9 0.170 \n", "16 8.5 0.280 0.56 1.8 0.092 \n", "17 8.1 0.560 0.28 1.7 0.368 \n", "18 7.4 0.590 0.08 4.4 0.086 \n", "19 7.9 0.320 0.51 1.8 0.341 \n", "\n", " free sulfur dioxide total sulfur dioxide density pH sulphates \\\n", "0 11.0 34.0 0.9978 3.51 0.56 \n", "1 25.0 67.0 0.9968 3.20 0.68 \n", "2 15.0 54.0 0.9970 3.26 0.65 \n", "3 17.0 60.0 0.9980 3.16 0.58 \n", "4 11.0 34.0 0.9978 3.51 0.56 \n", "5 13.0 40.0 0.9978 3.51 0.56 \n", "6 15.0 59.0 0.9964 3.30 0.46 \n", "7 15.0 21.0 0.9946 3.39 0.47 \n", "8 9.0 18.0 0.9968 3.36 0.57 \n", "9 17.0 102.0 0.9978 3.35 0.80 \n", "10 15.0 65.0 0.9959 3.28 0.54 \n", "11 17.0 102.0 0.9978 3.35 0.80 \n", "12 16.0 59.0 0.9943 3.58 0.52 \n", "13 9.0 29.0 0.9974 3.26 1.56 \n", "14 52.0 145.0 0.9986 3.16 0.88 \n", "15 51.0 148.0 0.9986 3.17 0.93 \n", "16 35.0 103.0 0.9969 3.30 0.75 \n", "17 16.0 56.0 0.9968 3.11 1.28 \n", "18 6.0 29.0 0.9974 3.38 0.50 \n", "19 17.0 56.0 0.9969 3.04 1.08 \n", "\n", " alcohol quality \n", "0 9.4 5 \n", "1 9.8 5 \n", "2 9.8 5 \n", "3 9.8 6 \n", "4 9.4 5 \n", "5 9.4 5 \n", "6 9.4 5 \n", "7 10.0 7 \n", "8 9.5 7 \n", "9 10.5 5 \n", "10 9.2 5 \n", "11 10.5 5 \n", "12 9.9 5 \n", "13 9.1 5 \n", "14 9.2 5 \n", "15 9.2 5 \n", "16 10.5 7 \n", "17 9.3 5 \n", "18 9.0 4 \n", "19 9.2 6 " ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "1599" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "red = spark.read.option(\"delimiter\", \";\").csv('./winequality-red.csv', header='true', inferSchema='true')\n", "# white = spark.read.option(\"delimiter\", \";\").csv('./winequality-white.csv', header='true', inferSchema='true')\n", "\n", "red.printSchema()\n", "display(red.limit(20).toPandas())\n", "red.count()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "338" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "train, test = red.randomSplit([0.8, 0.2])\n", "train.count()\n", "test.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Exploratory Data Analysis" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>summary</th>\n", " <th>quality</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>count</td>\n", " <td>1261</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>mean</td>\n", " <td>5.6193497224425055</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>stddev</td>\n", " <td>0.8014311934008284</td>\n", " </tr>\n", " <tr>\n", " <th>3</th>\n", " <td>min</td>\n", " <td>3</td>\n", " </tr>\n", " <tr>\n", " <th>4</th>\n", " <td>max</td>\n", " <td>8</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " summary quality\n", "0 count 1261\n", "1 mean 5.6193497224425055\n", "2 stddev 0.8014311934008284\n", "3 min 3\n", "4 max 8" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>quality</th>\n", " <th>count</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>3</td>\n", " <td>7</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>4</td>\n", " <td>42</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>5</td>\n", " <td>552</td>\n", " </tr>\n", " <tr>\n", " <th>3</th>\n", " <td>6</td>\n", " <td>499</td>\n", " </tr>\n", " <tr>\n", " <th>4</th>\n", " <td>7</td>\n", " <td>145</td>\n", " </tr>\n", " <tr>\n", " <th>5</th>\n", " <td>8</td>\n", " <td>16</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " quality count\n", "0 3 7\n", "1 4 42\n", "2 5 552\n", "3 6 499\n", "4 7 145\n", "5 8 16" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>skewness(quality)</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>0.292623</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " skewness(quality)\n", "0 0.292623" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>kurtosis(quality)</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>0.370935</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " kurtosis(quality)\n", "0 0.370935" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>25%</th>\n", " <th>50%</th>\n", " <th>75%</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>5.0</td>\n", " <td>6.0</td>\n", " <td>6.0</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " 25% 50% 75%\n", "0 5.0 6.0 6.0" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>quality_freqItems</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>[8, 5, 4, 7, 3, 6]</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " quality_freqItems\n", "0 [8, 5, 4, 7, 3, 6]" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.sql.functions import kurtosis, skewness\n", "\n", "labelCol = 'quality'\n", "\n", "display(train.select(labelCol).describe().toPandas())\n", "display(train.groupby(labelCol).count().orderBy(labelCol).toPandas())\n", "display(train.agg(skewness(labelCol)).toPandas())\n", "display(train.agg(kurtosis(labelCol)).toPandas())\n", "\n", "# Last parameter is error tolerance\n", "quantile = train.approxQuantile(labelCol, [0.25, 0.50, 0.75], 0.05)\n", "quantileFrame = spark.createDataFrame([quantile], ['25%', '50%', '75%'])\n", "display(quantileFrame.toPandas())\n", "train.stat.freqItems([labelCol], 0.1).toPandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The distribution of class has small positive skewness and kurtosis. So we can conclude it follows normal distribution roughly. " ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "<matplotlib.axes._subplots.AxesSubplot at 0x7f4334ceb860>" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" }, { "data": { "image/png": "\n", "text/plain": [ "<Figure size 432x432 with 2 Axes>" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "from pyspark.ml.stat import Correlation\n", "from pyspark.ml.feature import VectorAssembler\n", "\n", "changedTrain = train.withColumn(labelCol+'Double', train[labelCol].cast(\"double\")).drop(labelCol)\n", "\n", "numericCols = changedTrain.columns\n", "\n", "corrAssembler = VectorAssembler(inputCols=numericCols, outputCol='corrFeatures')\n", "corrTrain = corrAssembler.transform(changedTrain)\n", "corrMat = Correlation.corr(corrTrain, 'corrFeatures').head()\n", "\n", "pdf = corrMat[0].toArray()\n", "\n", "import seaborn as sns\n", "import matplotlib.pyplot as plt\n", "\n", "plt.figure(figsize=(6,6))\n", "\n", "sns.heatmap(pdf, \n", " xticklabels=numericCols,\n", " yticklabels=numericCols)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There's some moderate positive correlation between 'citric acid' and 'fixed acidity', 'free sulfur dioxide' and 'total sulfur dioxide', 'fixed acidity' and 'density'" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-RECORD 0------------------------------------\n", " fixed acidity | 1.7339054079326566 \n", " volatile acidity | 0.18387071977423192 \n", " citric acid | 0.1937298802160502 \n", " residual sugar | 1.4816221021751061 \n", " chlorides | 0.045579661274595056 \n", " free sulfur dioxide | 10.680847219823992 \n", " total sulfur dioxide | 32.517503458328534 \n", " density | 0.00191715082907368 \n", " pH | 0.15339972836093962 \n", " sulphates | 0.15549440282020172 \n", " alcohol | 1.0674132842935937 \n", "\n", "-RECORD 0------------------------------------\n", " fixed acidity | 3.0064279636581124 \n", " volatile acidity | 0.03380844159029412 \n", " citric acid | 0.03753126648852516 \n", " residual sugar | 2.195204053653781 \n", " chlorides | 0.00207750552190682 \n", " free sulfur dioxide | 114.08049733322189 \n", " total sulfur dioxide | 1057.3880311624082 \n", " density | 3.675467301417898... \n", " pH | 0.023531476661210065 \n", " sulphates | 0.02417850930841115 \n", " alcohol | 1.1393711194864362 \n", "\n" ] } ], "source": [ "from pyspark.sql.functions import stddev_pop, var_pop\n", "\n", "cols = train.columns[:]\n", "cols.remove(labelCol)\n", "\n", "stddev_pops = [stddev_pop(column).alias(column) for column in cols]\n", "var_pops = [var_pop(column).alias(column) for column in cols]\n", "\n", "df1 = train.agg(*stddev_pops)\n", "df2 = train.agg(*var_pops)\n", "df1.show(truncate=True, vertical=True)\n", "df2.show(truncate=True, vertical=True)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "%%script false\n", "from pyspark.ml.feature import PCA, StandardScaler, VectorAssembler\n", "from pyspark.ml import Pipeline\n", "\n", "cols = train.columns[:]\n", "cols.remove(labelCol)\n", "\n", "assembler = VectorAssembler(inputCols=cols, outputCol=\"assembledFeatures\")\n", "scalers = StandardScaler(inputCol=\"assembledFeatures\", outputCol=\"features\", withStd=True, withMean=True)\n", "pca = PCA(k=5, inputCol=\"features\", outputCol=\"pcaFeatures\")\n", "pipeline = Pipeline(stages=[assembler, scalers, pca])\n", "model = pipeline.fit(train)\n", "pcaTrain = model.transform(train)\n", "pcaTrain.select(\"features\", \"pcaFeatures\").show()" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "DataFrame[fixed acidity: double, volatile acidity: double, citric acid: double, residual sugar: double, chlorides: double, free sulfur dioxide: double, total sulfur dioxide: double, density: double, pH: double, sulphates: double, alcohol: double, quality: int]" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "train" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Classifiers" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "from pyspark.ml import Pipeline\n", "from pyspark.ml.tuning import CrossValidator, ParamGridBuilder\n", "from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n", "import pandas as pd\n", "\n", "accuracyDataFrame = pd.DataFrame(columns=['Train', 'Test'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Regressor" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>Train</th>\n", " <th>Test</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>GBTRegressor</th>\n", " <td>0.706159</td>\n", " <td>0.30547</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " Train Test\n", "GBTRegressor 0.706159 0.30547" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# %%script false \n", "from pyspark.ml.regression import GBTRegressor\n", "from pyspark.ml import Pipeline\n", "from pyspark.ml.tuning import CrossValidator, ParamGridBuilder\n", "from pyspark.ml.evaluation import RegressionEvaluator\n", "\n", "cols = train.columns[:]\n", "cols.remove(labelCol)\n", "\n", "gbtRegressorTrain = train\n", "\n", "assembler = VectorAssembler(inputCols=cols, outputCol='features')\n", "regressor = GBTRegressor(labelCol='quality')\n", "pipeline = Pipeline(stages=[assembler, regressor])\n", "grid = ParamGridBuilder().addGrid(regressor.maxIter, [5, 10, 15]).build()\n", "crossval = CrossValidator(estimator=pipeline,\n", " estimatorParamMaps = grid,\n", " evaluator = RegressionEvaluator(labelCol='quality'),\n", " numFolds=10)\n", "model = crossval.fit(gbtRegressorTrain)\n", "gbtRegressorTrain = model.transform(gbtRegressorTrain)\n", "gbtRegressorTest = model.transform(test)\n", "\n", "evaluator = RegressionEvaluator(labelCol='quality')\n", "\n", "trainAccuracy = evaluator.evaluate(gbtRegressorTrain, {evaluator.metricName: \"r2\"})\n", "testAccuracy = evaluator.evaluate(gbtRegressorTest, {evaluator.metricName: \"r2\"})\n", "accuracyDataFrame.loc['GBTRegressor'] = [trainAccuracy, testAccuracy]\n", "accuracyDataFrame" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## RandomForestClassifier" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>Train</th>\n", " <th>Test</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>GBTRegressor</th>\n", " <td>0.706159</td>\n", " <td>0.305470</td>\n", " </tr>\n", " <tr>\n", " <th>RandomForestClassifier</th>\n", " <td>0.693101</td>\n", " <td>0.597633</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " Train Test\n", "GBTRegressor 0.706159 0.305470\n", "RandomForestClassifier 0.693101 0.597633" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.ml.classification import RandomForestClassifier\n", "\n", "cols = train.columns[:]\n", "cols.remove(labelCol)\n", "\n", "randomForestClassifierTrain = train\n", "randomForestClassifierTest = test\n", "\n", "assembler = VectorAssembler(inputCols=cols, outputCol='features')\n", "classifier = RandomForestClassifier(labelCol='quality')\n", "pipeline = Pipeline(stages=[assembler, classifier])\n", "grid = ParamGridBuilder().addGrid(classifier.maxDepth, [3, 5])\\\n", " .addGrid(classifier.numTrees, [20, 50])\\\n", " .build()\n", "crossval = CrossValidator(estimator=pipeline,\n", " estimatorParamMaps = grid,\n", " evaluator = MulticlassClassificationEvaluator(labelCol='quality'),\n", " numFolds=10)\n", "model = crossval.fit(randomForestClassifierTrain)\n", "randomForestClassifierTrain = model.transform(randomForestClassifierTrain)\n", "randomForestClassifierTest = model.transform(randomForestClassifierTest)\n", "\n", "evaluator = MulticlassClassificationEvaluator(labelCol='quality')\n", "\n", "trainAccuracy = evaluator.evaluate(randomForestClassifierTrain, {evaluator.metricName: \"accuracy\"})\n", "testAccuracy = evaluator.evaluate(randomForestClassifierTest, {evaluator.metricName: \"accuracy\"})\n", "\n", "accuracyDataFrame.loc['RandomForestClassifier'] = [trainAccuracy, testAccuracy]\n", "accuracyDataFrame" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>Train</th>\n", " <th>Test</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>GBTRegressor</th>\n", " <td>0.706159</td>\n", " <td>0.305470</td>\n", " </tr>\n", " <tr>\n", " <th>RandomForestClassifier</th>\n", " <td>0.693101</td>\n", " <td>0.597633</td>\n", " </tr>\n", " <tr>\n", " <th>LogisticRegression</th>\n", " <td>0.598731</td>\n", " <td>0.535503</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " Train Test\n", "GBTRegressor 0.706159 0.305470\n", "RandomForestClassifier 0.693101 0.597633\n", "LogisticRegression 0.598731 0.535503" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.ml.classification import LogisticRegression\n", "\n", "cols = train.columns[:]\n", "cols.remove(labelCol)\n", "\n", "logisticRegressionTrain = train\n", "logisticRegressionTest = test\n", "\n", "assembler = VectorAssembler(inputCols=cols, outputCol='features')\n", "classifier = LogisticRegression(labelCol='quality')\n", "pipeline = Pipeline(stages=[assembler, classifier])\n", "grid = ParamGridBuilder().addGrid(classifier.regParam, [0.1, 0.15])\\\n", " .addGrid(classifier.maxIter, [100, 200, 300])\\\n", " .build()\n", "crossval = CrossValidator(estimator=pipeline,\n", " estimatorParamMaps = grid,\n", " evaluator = MulticlassClassificationEvaluator(labelCol='quality'),\n", " numFolds=10)\n", "model = crossval.fit(logisticRegressionTrain)\n", "logisticRegressionTrain = model.transform(logisticRegressionTrain)\n", "logisticRegressionTest = model.transform(logisticRegressionTest)\n", "\n", "evaluator = MulticlassClassificationEvaluator(labelCol='quality')\n", "\n", "trainAccuracy = evaluator.evaluate(logisticRegressionTrain, {evaluator.metricName: \"accuracy\"})\n", "testAccuracy = evaluator.evaluate(logisticRegressionTest, {evaluator.metricName: \"accuracy\"})\n", "\n", "accuracyDataFrame.loc['LogisticRegression'] = [trainAccuracy, testAccuracy]\n", "accuracyDataFrame" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## MultilayerPerceptronClassifier" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "from pyspark.ml.pipeline import Estimator, Model, Pipeline\n", "from pyspark.ml.param.shared import *\n", "from pyspark.sql.functions import min\n", "\n", "class HasMin(Params):\n", " minimum = Param(Params._dummy(), \"minimum\", \"minimum\")\n", "\n", " def __init__(self):\n", " super(HasMin, self).__init__()\n", "\n", " def setMin(self, value):\n", " return self._set(minimum=value)\n", "\n", " def getMin(self):\n", " return self.getOrDefault(self.minimum)\n", "\n", "class MinTransformation(Estimator, HasInputCol, \n", " HasOutputCol, HasMin):\n", "\n", " def _fit(self, dataset):\n", " c = self.getInputCol()\n", " self.minimum = dataset.agg(min(c)).first()[0]\n", " return (MinTransformationModel()\n", " .setInputCol(c)\n", " .setMin(self.minimum)\n", " .setOutputCol(self.getOutputCol()))\n", "\n", "class MinTransformationModel(Model, HasInputCol, HasOutputCol, HasMin):\n", "\n", " def _transform(self, dataset):\n", " x = self.getInputCol()\n", " y = self.getOutputCol()\n", " minimum = self.getMin()\n", " meta = {'MinTransformation' : minimum}\n", " return dataset.withColumn(y, (col(x) - minimum).alias(y, metadata=meta))\n", "\n", "class HasConst(Params):\n", " const = Param(Params._dummy(), \"const\", \"const\")\n", "\n", " def __init__(self):\n", " super(HasConst, self).__init__()\n", "\n", " def setConst(self, value):\n", " return self._set(const=value)\n", "\n", " def getConst(self):\n", " return self.getOrDefault(self.const)\n", "\n", "class ConstTransformation(Estimator, HasInputCol, \n", " HasOutputCol, HasConst):\n", "\n", " def _fit(self, dataset):\n", " c = self.getInputCol()\n", " for f in dataset.schema.fields:\n", " if 'MinTransformation' in f.metadata:\n", " self.const = f.metadata['MinTransformation']\n", " break\n", " \n", " return (ConstTransformationModel()\n", " .setInputCol(c)\n", " .setConst(self.const)\n", " .setOutputCol(self.getOutputCol()))\n", "\n", "class ConstTransformationModel(Model, HasInputCol, HasOutputCol, HasConst):\n", "\n", " def _transform(self, dataset):\n", " x = self.getInputCol()\n", " y = self.getOutputCol()\n", " const = self.getConst()\n", "\n", " return dataset.withColumn(y, col(x) + const)\n" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>Train</th>\n", " <th>Test</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>GBTRegressor</th>\n", " <td>0.706159</td>\n", " <td>0.305470</td>\n", " </tr>\n", " <tr>\n", " <th>RandomForestClassifier</th>\n", " <td>0.693101</td>\n", " <td>0.597633</td>\n", " </tr>\n", " <tr>\n", " <th>LogisticRegression</th>\n", " <td>0.598731</td>\n", " <td>0.535503</td>\n", " </tr>\n", " <tr>\n", " <th>MultilayerPerceptronClassifier</th>\n", " <td>0.616971</td>\n", " <td>0.529586</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " Train Test\n", "GBTRegressor 0.706159 0.305470\n", "RandomForestClassifier 0.693101 0.597633\n", "LogisticRegression 0.598731 0.535503\n", "MultilayerPerceptronClassifier 0.616971 0.529586" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.ml.classification import MultilayerPerceptronClassifier\n", "\n", "cols = train.columns[:]\n", "cols.remove(labelCol)\n", "\n", "mlpClassifierTrain = train\n", "mlpClassifierTest = test\n", "\n", "minTransformation = MinTransformation().setInputCol('quality').setOutputCol('reQuality')\n", "assembler = VectorAssembler(inputCols=cols, outputCol='features')\n", "classifier = MultilayerPerceptronClassifier(labelCol='reQuality')\n", "constTransformation = ConstTransformation().setInputCol('prediction').setOutputCol('actualPrediction')\n", "\n", "pipeline = Pipeline(stages=[minTransformation, assembler, classifier, constTransformation])\n", "grid = ParamGridBuilder().addGrid(classifier.maxIter, [100, 200])\\\n", " .addGrid(classifier.layers, [[11, 10, 10, 6], [11, 20, 10, 6]])\\\n", " .build()\n", "crossval = CrossValidator(estimator=pipeline,\n", " estimatorParamMaps = grid,\n", " evaluator = MulticlassClassificationEvaluator(labelCol='quality', predictionCol='actualPrediction'),\n", " numFolds=10)\n", "\n", "model = crossval.fit(mlpClassifierTrain)\n", "mlpClassifierTrain = model.transform(mlpClassifierTrain)\n", "mlpClassifierTest = model.transform(mlpClassifierTest)\n", "\n", "evaluator = MulticlassClassificationEvaluator(labelCol='quality', predictionCol='actualPrediction')\n", "\n", "trainAccuracy = evaluator.evaluate(mlpClassifierTrain, {evaluator.metricName: \"accuracy\"})\n", "testAccuracy = evaluator.evaluate(mlpClassifierTest, {evaluator.metricName: \"accuracy\"})\n", "\n", "accuracyDataFrame.loc['MultilayerPerceptronClassifier'] = [trainAccuracy, testAccuracy]\n", "accuracyDataFrame" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.4" } }, "nbformat": 4, "nbformat_minor": 2 }