{ "cells": [ { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(fixed acidity=7.4, volatile acidity=0.7, citric acid=0.0, residual sugar=1.9, chlorides=0.076, free sulfur dioxide=11.0, total sulfur dioxide=34.0, density=0.9978, pH=3.51, sulphates=0.56, alcohol=9.4, quality=5)]" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.context import SparkContext\n", "from pyspark.sql.session import SparkSession\n", "sc = SparkContext('local')\n", "spark = SparkSession(sc)\n", "\n", "wine_df = spark.read.csv('winequality-red.csv', header = True, inferSchema=True, sep =';')\n", "wine_df.take(1)" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------+------------------+------------------+\n", "|summary| alcohol| quality|\n", "+-------+------------------+------------------+\n", "| count| 1599| 1599|\n", "| mean|10.422983114446502|5.6360225140712945|\n", "| stddev|1.0656675818473935|0.8075694397347051|\n", "| min| 8.4| 3|\n", "| max| 14.9| 8|\n", "+-------+------------------+------------------+\n", "\n" ] } ], "source": [ "wine_df.select(\"alcohol\",\"quality\").describe().show()" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Correlation to MPG for fixed acidity 0.12405164911322263\n", "Correlation to MPG for volatile acidity -0.3905577802640061\n", "Correlation to MPG for citric acid 0.22637251431804048\n", "Correlation to MPG for residual sugar 0.013731637340065798\n", "Correlation to MPG for chlorides -0.12890655993005293\n", "Correlation to MPG for free sulfur dioxide -0.05065605724427597\n", "Correlation to MPG for total sulfur dioxide -0.18510028892653774\n", "Correlation to MPG for density -0.17491922778336474\n", "Correlation to MPG for pH -0.0577313912053826\n", "Correlation to MPG for sulphates 0.25139707906925995\n", "Correlation to MPG for alcohol 0.4761663240011364\n", "Correlation to MPG for quality 1.0\n" ] } ], "source": [ "import six\n", "for i in wine_df.columns:\n", " if not( isinstance(wine_df.select(i).take(1)[0][0], six.string_types)):\n", " print( \"Correlation to MPG for \", i, wine_df.stat.corr('quality',i))" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "wine_df = wine_df.drop(\"residual sugar\").drop(\"free sulfur dioxide\") \\\n", " .drop(\"pH\").drop(\"density\") \\\n", " .drop(\"chlorides\").drop('fixed acidity')" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(volatile acidity=0.7, citric acid=0.0, total sulfur dioxide=34.0, sulphates=0.56, alcohol=9.4, quality=5, features=DenseVector([0.7, 0.0, 34.0, 0.56, 9.4]))]" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.ml.feature import VectorAssembler\n", "\n", "vectorAssembler = VectorAssembler(inputCols = ['volatile acidity', 'citric acid', 'total sulfur dioxide', 'sulphates', 'alcohol'], outputCol = 'features')\n", "vwine_df = vectorAssembler.transform(wine_df)\n", "vwine_df.take(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Linear Regression" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [], "source": [ "splits = vwine_df.randomSplit([0.7, 0.3])\n", "train_df = splits[0]\n", "test_df = splits[1]" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Coefficients: [-1.134803300789282,-0.011468089424519127,-0.0024666032938323525,0.8259746568202124,0.29643567763999223]\n", "Intercept: 2.7325981169381355\n" ] } ], "source": [ "from pyspark.ml.regression import LinearRegression\n", "\n", "lr = LinearRegression(featuresCol = 'features', labelCol='quality', maxIter=10)\n", "lr_model = lr.fit(train_df)\n", "print(\"Coefficients: \" + str(lr_model.coefficients))\n", "print(\"Intercept: \" + str(lr_model.intercept))" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------------------+-------+--------------------+\n", "| prediction|quality| features|\n", "+------------------+-------+--------------------+\n", "| 6.321948236211139| 6|[0.16,0.64,52.0,0...|\n", "| 6.383392525710061| 6|[0.18,0.37,109.0,...|\n", "| 6.260948682710943| 6|[0.18,0.51,23.0,0...|\n", "| 6.260948682710943| 6|[0.18,0.51,23.0,0...|\n", "| 5.361153532922475| 5|[0.19,0.21,135.0,...|\n", "| 6.328172896188763| 6|[0.19,0.42,30.0,0...|\n", "| 6.183009588200807| 6|[0.22,0.24,28.0,0...|\n", "| 6.183009588200807| 6|[0.22,0.24,28.0,0...|\n", "| 5.553702448141423| 6|[0.22,0.48,60.0,0...|\n", "|5.5401318299853965| 4|[0.23,0.37,36.0,0...|\n", "| 6.226681409372491| 7|[0.24,0.35,27.0,0...|\n", "| 6.469942162899238| 7|[0.24,0.42,22.0,1...|\n", "| 6.447170802911483| 7|[0.24,0.46,21.0,1...|\n", "| 6.541619485323386| 6|[0.24,0.49,20.0,1...|\n", "| 6.711646680769182| 7|[0.25,0.39,10.0,0...|\n", "| 6.167801780409271| 6|[0.25,0.46,42.0,0...|\n", "| 5.813409467513248| 6|[0.26,0.42,27.0,0...|\n", "| 5.603333877120943| 5|[0.26,0.45,49.0,0...|\n", "| 6.35712538058033| 6|[0.26,0.48,10.0,0...|\n", "| 6.35712538058033| 6|[0.26,0.48,10.0,0...|\n", "+------------------+-------+--------------------+\n", "only showing top 20 rows\n", "\n" ] } ], "source": [ "predictions = lr_model.transform(test_df)\n", "predictions.select(\"prediction\",\"quality\",\"features\").show()" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "0.3359148546269666" ] }, "execution_count": 27, "metadata": {}, "output_type": "execute_result" } ], "source": [ "#Find R2 for Linear Regression\n", "from pyspark.ml.evaluation import RegressionEvaluator\n", "evaluator = RegressionEvaluator(predictionCol=\"prediction\", \\\n", " labelCol=\"quality\",metricName=\"r2\")\n", "evaluator.evaluate(predictions)" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [], "source": [ "sc.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_tensorflow_p36", "language": "python", "name": "conda_tensorflow_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.4" } }, "nbformat": 4, "nbformat_minor": 2 }