Spark MLlib library for Machine Learning provides a Collaborative Filtering implementation by using Alternating Least Squares. The implementation in MLlib has these parameters:

* numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
* rank is the number of latent factors in the model.
* iterations is the number of iterations to run.
* lambda specifies the regularization parameter in ALS.
* implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
* alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

In [0]:
from pyspark.sql import SparkSession

In [0]:
spark = SparkSession.builder.appName('rec').getOrCreate()

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [0]:
data = spark.read.csv("dbfs:/FileStore/shared_uploads/dizhen@hsph.harvard.edu/movielens_ratings.csv",inferSchema=True,header=True)

In [0]:
data.head()

Out[5]: Row(movieId=2, rating=3.0, userId=0)

In [0]:
data.describe().show()

+-------+------------------+------------------+------------------+
|summary|           movieId|            rating|            userId|
+-------+------------------+------------------+------------------+
|  count|              1501|              1501|              1501|
|   mean| 49.40572951365756|1.7741505662891406|14.383744170552964|
| stddev|28.937034065088994| 1.187276166124803| 8.591040424293272|
|    min|                 0|               1.0|                 0|
|    max|                99|               5.0|                29|
+-------+------------------+------------------+------------------+



In [0]:
(training, test) = data.randomSplit([0.8, 0.2])

In [0]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(training)

In [0]:
predictions = model.transform(test)

In [0]:
predictions.show()

+-------+------+------+-----------+
|movieId|rating|userId| prediction|
+-------+------+------+-----------+
|      2|   2.0|     1|  1.9871931|
|      1|   1.0|     6| 0.28594762|
|      4|   1.0|     7|   2.146959|
|      0|   1.0|     8|  1.8356189|
|      4|   2.0|     8| 0.87620103|
|      2|   3.0|     9|  2.1391335|
|      4|   1.0|     9|  2.4214845|
|      0|   1.0|    11| -1.3260899|
|      2|   1.0|    12|  3.1861515|
|      3|   1.0|    13|  2.1452992|
|      4|   2.0|    13|  0.7788123|
|      2|   1.0|    15|  2.4739377|
|      2|   1.0|    17|  -2.657514|
|      3|   1.0|    17|  0.2842377|
|      2|   2.0|    20| -1.2231252|
|      3|   2.0|    22|  0.5278863|
|      0|   1.0|    23| 0.34326315|
|      4|   1.0|    23|  1.9485532|
|      0|   1.0|    27|-0.45441574|
|      0|   3.0|    28|  0.7916339|
+-------+------+------+-----------+
only showing top 20 rows



In [0]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 1.783041436897024


In [0]:
single_user = test.filter(test['userId']==11).select(['movieId','userId'])

In [0]:
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|      0|    11|
|      9|    11|
|     12|    11|
|     20|    11|
|     25|    11|
|     43|    11|
|     47|    11|
|     51|    11|
|     66|    11|
|     75|    11|
|     81|    11|
|     97|    11|
|     99|    11|
+-------+------+



In [0]:
reccomendations = model.transform(single_user)

In [0]:
reccomendations.orderBy('prediction',ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|     47|    11| 2.4248514|
|     20|    11| 1.6464235|
|     43|    11| 1.3852543|
|      9|    11|  1.372124|
|     97|    11| 1.0738724|
|     25|    11| 1.0302699|
|     99|    11|0.95755816|
|     51|    11|0.62003374|
|     81|    11| 0.5865185|
|     12|    11|   0.44638|
|     75|    11|0.27664962|
|     66|    11|0.22641006|
|      0|    11|-1.3260899|
+-------+------+----------+



Mapping

 { 2. : "Chicken Curry",   
           3. : "Spicy Chicken Nuggest",   
           5. : "Hamburger",   
           9. : "Taco Surprise",  
           11. : "Meatloaf",  
           12. : "Ceaser Salad",  
           15. : "BBQ Ribs",  
           17. : "Sushi Plate",  
           19. : "Cheesesteak Sandwhich",  
           21. : "Lasagna",  
           23. : "Orange Chicken",
           26. : "Spicy Beef Plate",  
           27. : "Salmon with Mashed Potatoes",  
           28. : "Penne Tomatoe Pasta",  
           29. : "Pork Sliders",  
           30. : "Vietnamese Sandwich",  
           31. : "Chicken Wrap",  
           np.nan: "Cowboy Burger",   
           4. : "Pretzels and Cheese Plate",   
           6. : "Spicy Pork Sliders",  
           13. : "Mandarin Chicken PLate",  
           14. : "Kung Pao Chicken",
           16. : "Fried Rice Plate",  
           8. : "Chicken Chow Mein",  
           10. : "Roasted Eggplant ",  
           18. : "Pepperoni Pizza",  
           22. : "Pulled Pork Plate",   
           0. : "Cheese Pizza",   
           1. : "Burrito",   
           7. : "Nachos",  
           24. : "Chili",  
           20. : "Southwest Salad",  
           25.: "Roast Beef Sandwich"}

In [0]:
data = spark.read.csv("dbfs:/FileStore/shared_uploads/dizhen@hsph.harvard.edu/Meal_Info.csv",inferSchema=True,header=True)

In [0]:
(training, test) = data.randomSplit([0.8, 0.2])

In [0]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="mealskew", ratingCol="rating")
model = als.fit(training)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-1680652028931040>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0mals[0m [0;34m=[0m [0mALS[0m[0;34m([0m[0mmaxIter[0m[0;34m=[0m[0;36m5[0m[0;34m,[0m [0mregParam[0m[0;34m=[0m[0;36m0.01[0m[0;34m,[0m [0muserCol[0m[0;34m=[0m[0;34m"userId"[0m[0;34m,[0m [0mitemCol[0m[0;34m=[0m[0;34m"mealskew"[0m[0;34m,[0m [0mratingCol[0m[0;34m=[0m[0;34m"rating"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0mmodel[0m [0;34m=[0m [0mals[0m[0;34m.[0m[0mfit[0m[0;34m([0m[0mtraining[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/python_shell/dbruntime/MLWorkloadsInstrumentation/_pyspark.py[0m in [0;36mpatched_method[0;34m(self, *args, **kwargs)[0m
[1;32m     28[0m             [0mcall_succeeded[0m [0;34m=[0m [0;32mFal

In [0]:
predictions = model.transform(test)
predictions.show()

+-------+------+------+--------+--------------------+----------+
|movieId|rating|userId|mealskew|           meal_name|prediction|
+-------+------+------+--------+--------------------+----------+
|      2|   2.0|     1|     2.0|       Chicken Curry| 1.9871931|
|      3|   1.0|     1|     3.0|Spicy Chicken Nug...|0.90050435|
|      4|   3.0|     2|     4.0|Pretzels and Chee...|  2.912253|
|      0|   1.0|     5|     0.0|        Cheese Pizza| 1.1937842|
|      3|   1.0|     7|     3.0|Spicy Chicken Nug...|  1.367688|
|      4|   1.0|     7|     4.0|Pretzels and Chee...|  2.146959|
|      3|   2.0|     8|     3.0|Spicy Chicken Nug...| 1.7796037|
|      2|   3.0|     9|     2.0|       Chicken Curry| 2.1391335|
|      3|   1.0|     9|     3.0|Spicy Chicken Nug...| 1.0622486|
|      2|   1.0|    12|     2.0|       Chicken Curry| 3.1861515|
|      4|   1.0|    12|     4.0|Pretzels and Chee...|0.73995054|
|      3|   1.0|    13|     3.0|Spicy Chicken Nug...| 2.1452992|
|      1|   4.0|    15|  

In [0]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.8463110939439068
