In [1]:
# Section 11 : Linear Regression 

In [2]:
# course 11.35

In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression

In [4]:
spark = SparkSession.builder.appName('linear_regression').getOrCreate()

In [5]:
# load the data
# https://github.com/yennanliu/analysis/blob/master/SPARK_/sample_linear_regression_data.txt

training = spark.read.format('libsvm').load('sample_linear_regression_data.txt')

In [6]:
# take a look on the original data

!head -5 sample_linear_regression_data.txt

-9.490009878824548 1:0.4551273600657362 2:0.36644694351969087 3:-0.38256108933468047 4:-0.4458430198517267 5:0.33109790358914726 6:0.8067445293443565 7:-0.2624341731773887 8:-0.44850386111659524 9:-0.07269284838169332 10:0.5658035575800715
0.2577820163584905 1:0.8386555657374337 2:-0.1270180511534269 3:0.499812362510895 4:-0.22686625128130267 5:-0.6452430441812433 6:0.18869982177936828 7:-0.5804648622673358 8:0.651931743775642 9:-0.6555641246242951 10:0.17485476357259122
-4.438869807456516 1:0.5025608135349202 2:0.14208069682973434 3:0.16004976900412138 4:0.505019897181302 5:-0.9371635223468384 6:-0.2841601610457427 7:0.6355938616712786 8:-0.1646249064941625 9:0.9480713629917628 10:0.42681251564645817
-19.782762789614537 1:-0.0388509668871313 2:-0.4166870051763918 3:0.8997202693189332 4:0.6409836467726933 5:0.273289095712564 6:-0.26175701211620517 7:-0.2794902492677298 8:-0.1306778297187794 9:-0.08536581111046115 10:-0.05462315824828923
-7.966593841555266 1:-0.06195495876886281 2:0

In [7]:
training

DataFrame[label: double, features: vector]

In [8]:
# HERE YOU CAN SEE THE DATASET IS WITH "LABEL" AND "FEATURES" COLUMNS
# WHICH IS THE DEFAULT SCHEMA THAT SPARK ML-LIB CAN RUN MODELING WITH 
##### label: double, features: vector #####

training.show()

+-------------------+--------------------+
| label| features|
+-------------------+--------------------+
| -9.490009878824548|(10,[0,1,2,3,4,5,...|
| 0.2577820163584905|(10,[0,1,2,3,4,5,...|
| -4.438869807456516|(10,[0,1,2,3,4,5,...|
|-19.782762789614537|(10,[0,1,2,3,4,5,...|
| -7.966593841555266|(10,[0,1,2,3,4,5,...|
| -7.896274316726144|(10,[0,1,2,3,4,5,...|
| -8.464803554195287|(10,[0,1,2,3,4,5,...|
| 2.1214592666251364|(10,[0,1,2,3,4,5,...|
| 1.0720117616524107|(10,[0,1,2,3,4,5,...|
|-13.772441561702871|(10,[0,1,2,3,4,5,...|
| -5.082010756207233|(10,[0,1,2,3,4,5,...|
| 7.887786536531237|(10,[0,1,2,3,4,5,...|
| 14.323146365332388|(10,[0,1,2,3,4,5,...|
|-20.057482615789212|(10,[0,1,2,3,4,5,...|
|-0.8995693247765151|(10,[0,1,2,3,4,5,...|
| -19.16829262296376|(10,[0,1,2,3,4,5,...|
| 5.601801561245534|(10,[0,1,2,3,4,5,...|
|-3.2256352187273354|(10,[0,1,2,3,4,5,...|
| 1.5299675726687754|(10,[0,1,2,3,4,5,...|
| -0.250102447941961|(10,[0,1,2,3,4,5,...|
+-------------------+----------------

## 0) simple training 

In [9]:
# create the model 

lr = LinearRegression(featuresCol='features',
 labelCol= 'label',
 predictionCol = 'prediciton')

In [10]:
# train the model 
lrModel = lr.fit(training)

In [11]:
# print output 

print ('coefficients : ', lrModel.coefficients)
print ('intercept : ', lrModel.intercept)

coefficients : [0.0073350710225801715,0.8313757584337543,-0.8095307954684084,2.441191686884721,0.5191713795290003,1.1534591903547016,-0.2989124112808717,-0.5128514186201779,-0.619712827067017,0.6956151804322931]
intercept : 0.14228558260358093


In [12]:
# use train cummary 

training_summary = lrModel.summary
print ('r2 : ', training_summary.r2)
print ('rootMeanSquaredError : ', training_summary.rootMeanSquaredError)


r2 : 0.027839179518600154
rootMeanSquaredError : 10.16309157133015


## 1) train - test data split and prediciton 

In [13]:
all_data = spark.read.format('libsvm').load('sample_linear_regression_data.txt')


In [14]:
# random split 

train_data, test_data = all_data.randomSplit([.7,.3])

In [15]:
print ( 'all data count : ', all_data.count())
print ( 'train data count : ', train_data.count())
print ( 'test data count : ', test_data.count())

all data count : 501
train data count : 365
test data count : 136


In [16]:
# train again only on train data 
correct_model = lr.fit(training)

In [17]:
# test on test data 

test_results = correct_model.evaluate(test_data)

In [18]:
# print the results (test data)

# use train cummary 

print ('r2 : ', test_results.r2)
print ('rootMeanSquaredError : ', test_results.rootMeanSquaredError)


r2 : 0.08455715143231024
rootMeanSquaredError : 9.691326744797616


In [19]:
unlabeled_data = test_data.select('features')

In [20]:
# have some test data to run the followign prdiciton 
unlabeled_data.show()

+--------------------+
| features|
+--------------------+
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
|(10,[0,1,2,3,4,5,...|
+--------------------+
only showing top 20 rows



In [21]:
# MAKE PREDICTION 
# NOTE : IN SPARK ML-LIB, PREDICT IS RUN BY "transform" COMMAND 

predictions = correct_model.transform(unlabeled_data)

In [22]:
# show prediction 
predictions.show()

+--------------------+--------------------+
| features| prediciton|
+--------------------+--------------------+
|(10,[0,1,2,3,4,5,...| -3.5124943764463135|
|(10,[0,1,2,3,4,5,...| -3.147868811718382|
|(10,[0,1,2,3,4,5,...| -2.499423280435292|
|(10,[0,1,2,3,4,5,...| 1.7010353768556734|
|(10,[0,1,2,3,4,5,...| -0.5388564818088987|
|(10,[0,1,2,3,4,5,...| -1.475284763550391|
|(10,[0,1,2,3,4,5,...| -0.7489108841213971|
|(10,[0,1,2,3,4,5,...| -2.508322852836744|
|(10,[0,1,2,3,4,5,...| -0.976510689078842|
|(10,[0,1,2,3,4,5,...| -0.9566138722165072|
|(10,[0,1,2,3,4,5,...| 3.7236186142728274|
|(10,[0,1,2,3,4,5,...| 1.2421598960943985|
|(10,[0,1,2,3,4,5,...| -0.7195663865895121|
|(10,[0,1,2,3,4,5,...| -1.780965034607929|
|(10,[0,1,2,3,4,5,...|-0.06740884917840151|
|(10,[0,1,2,3,4,5,...| 2.746996971787099|
|(10,[0,1,2,3,4,5,...| 0.5789191740943999|
|(10,[0,1,2,3,4,5,...| -1.2048075065353916|
|(10,[0,1,2,3,4,5,...| -0.6964026254414395|
|(10,[0,1,2,3,4,5,...| -3.0756131143558623|
+-------------------

In [23]:
# end of 11.35 
# next : 11.36

## 2) * * * Load csv and transform to the data can be trained by Spark MLIB

In [24]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [25]:
data = spark.read.csv('boston.csv', inferSchema=True, header=True)

In [26]:
data.columns

['CRIM',
 'ZN',
 'INDUS',
 'CHAS',
 'NOX',
 'RM',
 'AGE',
 'DIS',
 'RAD',
 'TAX',
 'PTRATIO',
 'B',
 'LSTAT',
 'price']

In [27]:
data.printSchema()

root
 |-- CRIM: double (nullable = true)
 |-- ZN: double (nullable = true)
 |-- INDUS: double (nullable = true)
 |-- CHAS: double (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: double (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: double (nullable = true)
 |-- TAX: double (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: double (nullable = true)
 |-- price: double (nullable = true)



In [28]:
data.show(2)

+-------+----+-----+----+-----+-----+----+------+---+-----+-------+-----+-----+-----+
| CRIM| ZN|INDUS|CHAS| NOX| RM| AGE| DIS|RAD| TAX|PTRATIO| B|LSTAT|price|
+-------+----+-----+----+-----+-----+----+------+---+-----+-------+-----+-----+-----+
|0.00632|18.0| 2.31| 0.0|0.538|6.575|65.2| 4.09|1.0|296.0| 15.3|396.9| 4.98| 24.0|
|0.02731| 0.0| 7.07| 0.0|0.469|6.421|78.9|4.9671|2.0|242.0| 17.8|396.9| 9.14| 21.6|
+-------+----+-----+----+-----+-----+----+------+---+-----+-------+-----+-----+-----+
only showing top 2 rows



In [29]:
#################################################################################
# 
# -- transform csv to feature to be access by SPARK MLIB --
# all numerical cols as feature (except price), set price as target to predict 
#
#
#
#
#
#################################################################################

input_cols_ = ['CRIM',
 'ZN',
 'INDUS',
 'CHAS',
 'NOX',
 'RM',
 'AGE',
 'DIS',
 'RAD',
 'TAX',
 'PTRATIO',
 'B',
 'LSTAT']


# -------------------------------------------------------------------# 

# transform input_cols_ --> feature (for following ML using) #
assembler = VectorAssembler(inputCols= input_cols_, outputCol = 'features')

# -------------------------------------------------------------------# 

In [30]:
# run the transformation 
output = assembler.transform(data)

In [31]:
# show the transformed feature col 
output.select('features').show()

+--------------------+
| features|
+--------------------+
|[0.00632,18.0,2.3...|
|[0.02731,0.0,7.07...|
|[0.02729,0.0,7.07...|
|[0.03237,0.0,2.18...|
|[0.06905,0.0,2.18...|
|[0.02985,0.0,2.18...|
|[0.08829,12.5,7.8...|
|[0.14455,12.5,7.8...|
|[0.21124,12.5,7.8...|
|[0.17004,12.5,7.8...|
|[0.22489,12.5,7.8...|
|[0.11747,12.5,7.8...|
|[0.09378,12.5,7.8...|
|[0.62976,0.0,8.14...|
|[0.63796,0.0,8.14...|
|[0.62739,0.0,8.14...|
|[1.05393,0.0,8.14...|
|[0.7842,0.0,8.14,...|
|[0.80271,0.0,8.14...|
|[0.7258,0.0,8.14,...|
+--------------------+
only showing top 20 rows



In [32]:
# create the final data for training ( feature as input, price as prediction output)
final_data = output.select('features', 'price')

In [33]:
final_data.show()

+--------------------+-----+
| features|price|
+--------------------+-----+
|[0.00632,18.0,2.3...| 24.0|
|[0.02731,0.0,7.07...| 21.6|
|[0.02729,0.0,7.07...| 34.7|
|[0.03237,0.0,2.18...| 33.4|
|[0.06905,0.0,2.18...| 36.2|
|[0.02985,0.0,2.18...| 28.7|
|[0.08829,12.5,7.8...| 22.9|
|[0.14455,12.5,7.8...| 27.1|
|[0.21124,12.5,7.8...| 16.5|
|[0.17004,12.5,7.8...| 18.9|
|[0.22489,12.5,7.8...| 15.0|
|[0.11747,12.5,7.8...| 18.9|
|[0.09378,12.5,7.8...| 21.7|
|[0.62976,0.0,8.14...| 20.4|
|[0.63796,0.0,8.14...| 18.2|
|[0.62739,0.0,8.14...| 19.9|
|[1.05393,0.0,8.14...| 23.1|
|[0.7842,0.0,8.14,...| 17.5|
|[0.80271,0.0,8.14...| 20.2|
|[0.7258,0.0,8.14,...| 18.2|
+--------------------+-----+
only showing top 20 rows



In [34]:
# train, test split 

train_data, test_data = final_data.randomSplit([0.7, 0.3])

In [35]:
print ( 'all data count : ', final_data.count())
print ( 'train data count : ', train_data.count())
print ( 'test data count : ', test_data.count())

all data count : 506
train data count : 345
test data count : 161


In [36]:
train_data.describe().show()

+-------+------------------+
|summary| price|
+-------+------------------+
| count| 345|
| mean|22.602898550724642|
| stddev| 9.311339220842957|
| min| 5.0|
| max| 50.0|
+-------+------------------+



In [37]:
# now, training with linear regression again 

lr = LinearRegression(labelCol='price')

In [38]:
lr_model = lr.fit(train_data)

In [39]:
test_result = lr_model.evaluate(test_data)

In [40]:
# print the test data fitting results 

test_result.residuals.show()

+--------------------+
| residuals|
+--------------------+
| 2.820847914698863|
| 10.092829367610015|
| 7.383702839871042|
| 5.77379335027215|
| 1.439135362533321|
| 6.360009488793956|
| -5.894733583181768|
| -3.771279882265347|
|-0.10009484396961454|
| 5.50501225227282|
| 1.1319632754860187|
| -5.434888522994932|
| 7.662804510616631|
|-0.03649746518004804|
| -6.7716019362626625|
| -3.257293842284902|
| -0.5461996288341915|
| -1.0521216424280162|
| -3.452964004804734|
| -11.084224152768945|
+--------------------+
only showing top 20 rows



In [41]:
# print the test data fitting results 


print ('* r2 : ', test_result.r2)
print ('* rootMeanSquaredError : ', test_result.rootMeanSquaredError)


print ('* coefficients : ', lrModel.coefficients)
print ('* intercept : ', lrModel.intercept)


* r2 : 0.7225741051946737
* rootMeanSquaredError : 4.712121102879971
* coefficients : [0.0073350710225801715,0.8313757584337543,-0.8095307954684084,2.441191686884721,0.5191713795290003,1.1534591903547016,-0.2989124112808717,-0.5128514186201779,-0.619712827067017,0.6956151804322931]
* intercept : 0.14228558260358093


In [42]:
# compare with the input data 

# 1) rootMeanSquaredError : 4.482524071400767 VS mean : 22.53280632411069
# 2) r2 : 0.7534954131369154 --> means the model only covers ~ 75% of the variant of the data, which is not a very good model 

final_data.describe().show()

+-------+------------------+
|summary| price|
+-------+------------------+
| count| 506|
| mean|22.532806324110698|
| stddev| 9.197104087379815|
| min| 5.0|
| max| 50.0|
+-------+------------------+



In [43]:
# run the trained model with test data ( the data the model hasn't seen before)
unlabeled_data = test_data.select('features')

In [44]:
unlabeled_data.show()

+--------------------+
| features|
+--------------------+
|[0.01301,35.0,1.5...|
|[0.01381,80.0,0.4...|
|[0.01538,90.0,3.7...|
|[0.01709,90.0,2.0...|
|[0.01965,80.0,1.7...|
|[0.02177,82.5,2.0...|
|[0.02498,0.0,1.89...|
|[0.02731,0.0,7.07...|
|[0.02763,75.0,2.9...|
|[0.02899,40.0,1.2...|
|[0.03113,0.0,4.39...|
|[0.03445,82.5,2.0...|
|[0.0351,95.0,2.68...|
|[0.03548,80.0,3.6...|
|[0.03584,80.0,3.3...|
|[0.03615,80.0,4.9...|
|[0.03738,0.0,5.19...|
|[0.04297,52.5,5.3...|
|[0.0456,0.0,13.89...|
|[0.04741,0.0,11.9...|
+--------------------+
only showing top 20 rows



In [47]:
# run the prediction 
predictions = lr_model.transform(unlabeled_data)

predictions.show()

+--------------------+------------------+
| features| prediction|
+--------------------+------------------+
|[0.01301,35.0,1.5...| 29.87915208530114|
|[0.01381,80.0,0.4...|39.907170632389985|
|[0.01538,90.0,3.7...| 36.61629716012896|
|[0.01709,90.0,2.0...| 24.32620664972785|
|[0.01965,80.0,1.7...| 18.66086463746668|
|[0.02177,82.5,2.0...| 35.93999051120604|
|[0.02498,0.0,1.89...|22.394733583181768|
|[0.02731,0.0,7.07...| 25.37127988226535|
|[0.02763,75.0,2.9...|30.900094843969615|
|[0.02899,40.0,1.2...| 21.09498774772718|
|[0.03113,0.0,4.39...| 16.36803672451398|
|[0.03445,82.5,2.0...|29.534888522994933|
|[0.0351,95.0,2.68...| 40.83719548938337|
|[0.03548,80.0,3.6...|20.936497465180047|
|[0.03584,80.0,3.3...|30.271601936262662|
|[0.03615,80.0,4.9...| 31.1572938422849|
|[0.03738,0.0,5.19...| 21.24619962883419|
|[0.04297,52.5,5.3...|25.852121642428017|
|[0.0456,0.0,13.89...|26.752964004804735|
|[0.04741,0.0,11.9...|22.984224152768945|
+--------------------+------------------+
only showin

In [46]:
# end of 11.37 
# next : 11.38 