# RePlay Tutorial
This notebook is designed to familiarize with the use of RePlay library, including 
- data preprocessing
- data splitting
- model training and inference
- model optimization
- model saving and loading
- models comparison

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
%config Completer.use_jedi = False

In [3]:
import warnings
from optuna.exceptions import ExperimentalWarning
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=ExperimentalWarning)

In [4]:
import pandas as pd
from pyspark.sql.functions import rand

from replay.data_preparator import DataPreparator
from replay.experiment import Experiment
from replay.metrics import Coverage, HitRate, NDCG, MAP
from replay.model_handler import save, load
from replay.models import ALSWrap, KNN, SLIM
from replay.session_handler import State
from replay.splitters import UserSplitter
from replay.utils import convert2spark

In [5]:
K = 5
SEED=1234

## 0. Data preprocessing <a name='data-preparator'></a>
We will use MovieLens 1m as an example.

In [6]:
df = pd.read_csv("data/ml1m_ratings.dat", sep="\t", names=["user_id", "item_id", "relevance", "timestamp"])
users = pd.read_csv("data/ml1m_users.dat", sep="\t", names=["user_id", "gender", "age", "occupation", "zip_code"])

### 0.1. DataPreparator

An inner data format in RePlay is a spark dataframe.
You can pass spark or pandas dataframe as an input. Columns ``item_id`` and ``user_id`` are required for interaction matrix.
Optional columns for interaction matrix are ``relevance`` and interaction ``timestamp``. 

We implemented DataPreparator class to convert dataframes to spark format and preprocess the data, including renaming/creation of required and optional interaction matrix columns, null check and dates parsing.

To convert pandas dataframe to spark as is use function ``convert_to_spark`` from ``replay.utils``.

In [7]:
preparator = DataPreparator()
log, _, _ = preparator(df)

22/02/27 23:04:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/27 23:04:23 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
22/02/27 23:04:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/02/27 23:04:23 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/02/27 23:04:23 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
                                                                                

In [8]:
log.show(3)

+---------+---------+--------+--------+
|relevance|timestamp|user_idx|item_idx|
+---------+---------+--------+--------+
|        5|978300760|    4131|      43|
|        3|978302109|    4131|     585|
|        3|978301968|    4131|     461|
+---------+---------+--------+--------+
only showing top 3 rows



In [9]:
users = convert2spark(users)
users.show(3)

+-------+------+---+----------+--------+
|user_id|gender|age|occupation|zip_code|
+-------+------+---+----------+--------+
|      1|     F|  1|        10|   48067|
|      2|     M| 56|        16|   70072|
|      3|     M| 25|        15|   55117|
+-------+------+---+----------+--------+
only showing top 3 rows



### 0.2. Split

RePlay provides you with data splitters to reproduce a validation schemas widely-used in recommender systems.

`UserSplitter` takes ``item_test_size`` items for ``user_test_size`` user to the test dataset.

In [10]:
splitter = UserSplitter(
    drop_cold_items=True,
    drop_cold_users=True,
    item_test_size=K,
    user_test_size=500,
    seed=SEED,
    shuffle=True
)
train, test = splitter.split(log)
print(train.count(), test.count())

22/02/27 23:04:37 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/27 23:04:38 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

997709 2500




## 1. Models training

#### SLIM

In [11]:
slim = SLIM(seed=SEED)

In [12]:
%%time

slim.fit(log=train)

                                                                                

CPU times: user 1.53 s, sys: 129 ms, total: 1.66 s
Wall time: 5.9 s


In [13]:
%%time

recs = slim.predict(
    k=K,
    users=test.select('user_idx').distinct(),
    log=train,
    filter_seen_items=True
)



CPU times: user 23.1 ms, sys: 16.4 ms, total: 39.4 ms
Wall time: 1.77 s


In [14]:
recs.show(2)



+--------+--------+------------------+
|user_idx|item_idx|         relevance|
+--------+--------+------------------+
|      38|      73| 1.235672623556484|
|      38|     361|1.1715979128347436|
+--------+--------+------------------+
only showing top 2 rows



                                                                                

## 2. Models evaluation

RePlay implements some popular recommenders' quality metrics. Use pure metrics or calculate a set of chosen metrics and compare models with the ``Experiment`` class.

In [15]:
metrics = Experiment(test, {NDCG(): K,
                            MAP() : K,
                            HitRate(): [1, K],
                            Coverage(train): K
                           })

In [16]:
%%time
metrics.add_result("SLIM", recs)
metrics.results

                                                                                

CPU times: user 360 ms, sys: 75.5 ms, total: 436 ms
Wall time: 47.5 s


Unnamed: 0,Coverage@5,HitRate@1,HitRate@5,MAP@5,NDCG@5
SLIM,0.16055,0.242,0.558,0.09372,0.165643


## 3. Hyperparameters optimization

#### 3.1 Search

In [17]:
# data split for hyperparameters optimization
train_opt, val_opt = splitter.split(train)

22/02/27 23:06:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/27 23:06:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [18]:
%%time
best_params = slim.optimize(train_opt, val_opt, criterion=NDCG(), k=K, budget=15)

[32m[I 2022-02-27 23:06:17,681][0m A new study created in memory with name: no-name-b0d54335-8d37-401f-a916-3ba55ed9c932[0m
[32m[I 2022-02-27 23:06:51,535][0m Trial 0 finished with value: 0.18130037719542139 and parameters: {'beta': 0.01, 'lambda_': 0.01}. Best is trial 0 with value: 0.18130037719542139.[0m
22/02/27 23:06:51 WARN CacheManager: Asked to cache already cached data.
22/02/27 23:06:51 WARN CacheManager: Asked to cache already cached data.
[32m[I 2022-02-27 23:07:31,090][0m Trial 1 finished with value: 0.18197356840108678 and parameters: {'beta': 0.003401392505408624, 'lambda_': 0.002240239840999655}. Best is trial 1 with value: 0.18197356840108678.[0m
22/02/27 23:07:31 WARN CacheManager: Asked to cache already cached data.
22/02/27 23:07:31 WARN CacheManager: Asked to cache already cached data.
[32m[I 2022-02-27 23:07:56,590][0m Trial 2 finished with value: 0.10199049759426765 and parameters: {'beta': 1.9301997111553214e-05, 'lambda_': 1.1554917603144903}. Best i

CPU times: user 24.9 s, sys: 2.73 s, total: 27.6 s
Wall time: 6min 52s


In [19]:
best_params

{'beta': 0.11351011099824757, 'lambda_': 2.678667716748947e-06}

#### 3.2 Compare with previous

In [20]:
def fit_predict_evaluate(model, experiment, name):
    model.fit(log=train)

    recs = model.predict(
        k=K,
        users=test.select('user_idx').distinct(),
        log=train,
        filter_seen_items=True
    )

    experiment.add_result(name, recs)
    return recs

In [21]:
%%time
recs = fit_predict_evaluate(SLIM(**best_params, seed=SEED), metrics, 'SLIM_optimized')
metrics.results.sort_values('NDCG@5', ascending=False)

22/02/27 23:13:15 WARN CacheManager: Asked to cache already cached data.
22/02/27 23:13:15 WARN CacheManager: Asked to cache already cached data.
                                                                                4]]]]

CPU times: user 1.9 s, sys: 284 ms, total: 2.18 s
Wall time: 52.9 s




Unnamed: 0,Coverage@5,HitRate@1,HitRate@5,MAP@5,NDCG@5
SLIM_optimized,0.147598,0.24,0.57,0.095547,0.168684
SLIM,0.16055,0.242,0.558,0.09372,0.165643


### Convert to pandas

In [22]:
recs_pd = recs.toPandas()
recs_pd.head(2)

                                                                                4]

Unnamed: 0,user_idx,item_idx,relevance
0,38,73,1.230351
1,38,361,1.212302


## 4. Save and load

RePlay allows to save and load fitted models with `save` and `load` functions of `model_handler` module. Model is saved as a folder with all necessary parameters and data.

In [23]:
save(slim, path='./slim_best_params')
slim_loaded = load('./slim_best_params')

                                                                                

In [24]:
%%time
pred_from_loaded = slim_loaded.predict(k=K,
    users=test.select('user_idx').distinct(),
    log=train,
    filter_seen_items=True)
pred_from_loaded.show(2)



+--------+--------+------------------+
|user_idx|item_idx|         relevance|
+--------+--------+------------------+
|      38|      14|1.1936188460415138|
|      38|      73|1.1193345759515603|
+--------+--------+------------------+
only showing top 2 rows

CPU times: user 67 ms, sys: 3.66 ms, total: 70.7 ms
Wall time: 13.1 s




In [25]:
slim_loaded.beta, slim_loaded.lambda_

(0.11351011099824757, 2.678667716748947e-06)

## 5. Other RePlay models

#### ALS
Commonly-used matrix factorization algorithm.

In [26]:
%%time
recs = fit_predict_evaluate(ALSWrap(rank=100, seed=SEED), metrics, 'ALS')
metrics.results.sort_values('NDCG@5', ascending=False)

22/02/27 23:14:46 WARN CacheManager: Asked to cache already cached data.
22/02/27 23:14:46 WARN CacheManager: Asked to cache already cached data.
22/02/27 23:14:50 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
22/02/27 23:14:50 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
22/02/27 23:14:50 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
22/02/27 23:14:50 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
22/02/27 23:15:01 WARN DAGScheduler: Broadcasting large task binary with size 1004.5 KiB
22/02/27 23:15:02 WARN DAGScheduler: Broadcasting large task binary with size 1047.0 KiB
22/02/27 23:15:03 WARN DAGScheduler: Broadcasting large task binary with size 1005.4 KiB
22/02/27 23:15:03 WARN DAGScheduler: Broadcasting large task binary with size 1089.4 KiB
22/02/27 23:15:03 WARN DAGScheduler: Broadcasting large task binary 

CPU times: user 437 ms, sys: 130 ms, total: 566 ms
Wall time: 1min 44s


Unnamed: 0,Coverage@5,HitRate@1,HitRate@5,MAP@5,NDCG@5
SLIM_optimized,0.147598,0.24,0.57,0.095547,0.168684
SLIM,0.16055,0.242,0.558,0.09372,0.165643
ALS,0.195359,0.216,0.54,0.0916,0.160843


#### KNN
Commonly-used item-based recommender

In [27]:
%%time
recs = fit_predict_evaluate(KNN(num_neighbours=100), metrics, 'KNN')
metrics.results.sort_values('NDCG@5', ascending=False)

22/02/27 23:16:31 WARN CacheManager: Asked to cache already cached data.
22/02/27 23:16:31 WARN CacheManager: Asked to cache already cached data.
                                                                                 144]]

CPU times: user 283 ms, sys: 90.3 ms, total: 374 ms
Wall time: 1min 7s


Unnamed: 0,Coverage@5,HitRate@1,HitRate@5,MAP@5,NDCG@5
SLIM_optimized,0.147598,0.24,0.57,0.095547,0.168684
SLIM,0.16055,0.242,0.558,0.09372,0.165643
ALS,0.195359,0.216,0.54,0.0916,0.160843
KNN,0.052348,0.144,0.384,0.054447,0.101923


## 6 Compare RePlay models with others
To easily evaluate recommendations obtained from other sources, read and pass these recommendations to ``Experiment``

In [29]:
metrics.add_result("my_model", recs)
metrics.results.sort_values("NDCG@5", ascending=False)

                                                                                

Unnamed: 0,Coverage@5,HitRate@1,HitRate@5,MAP@5,NDCG@5
SLIM_optimized,0.147598,0.24,0.57,0.095547,0.168684
SLIM,0.16055,0.242,0.558,0.09372,0.165643
ALS,0.195359,0.216,0.54,0.0916,0.160843
KNN,0.052348,0.144,0.384,0.054447,0.101923
my_model,0.052348,0.144,0.384,0.054447,0.101923
