In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
%config Completer.use_jedi = False

In [3]:
import warnings
from optuna.exceptions import ExperimentalWarning
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=ExperimentalWarning)

In [4]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import array_contains, col, explode, split, substring

from replay.data_preparator import DataPreparator
from replay.experiment import Experiment
from replay.metrics import HitRate, NDCG, MAP, Coverage
from replay.models import LightFMWrap
from replay.session_handler import State
from replay.splitters import UserSplitter
from rs_datasets import MovieLens

In [5]:
K=10
SEED=1234

# The notebook contains an example of LightFM model usage and dataset preprocessing with RePlay, including:
1. Data loading
2. Features preprocessing with pyspark
3. Building LightFM model based on interaction matrix and features
4. Model evaluation

# 1) Data loading

We will use MovieLens 10m dataset from rs_datasets package, which contains a list of recommendations datasets.

In [6]:
data = MovieLens("10m")
data.info()

ratings


Unnamed: 0,user_id,item_id,rating,timestamp
0,1,122,5.0,838985046
1,1,185,5.0,838983525
2,1,231,5.0,838983392



items


Unnamed: 0,item_id,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance



tags


Unnamed: 0,user_id,item_id,tag,timestamp
0,15,4973,excellent!,1215184630
1,20,1747,politics,1188263867
2,20,1747,satire,1188263867





### Convert interaction log to RePlay format

In [7]:
preparator = DataPreparator()
log, _, item_features = preparator(data.ratings, item_features=data.items, mapping={"relevance": "rating"})

22/02/27 22:14:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/27 22:14:17 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
22/02/27 22:14:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/02/27 22:14:18 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/02/27 22:14:23 WARN TaskSetManager: Stage 0 contains a task of very large size (4073 KiB). The maximum recommended task size is 1000 KiB.
22/02/27 22:14:27 WARN TaskSetManager: Stage 2 contains a task of very large size (4073 KiB). The maximum recom

### Data split

In [8]:
user_random_splitter = UserSplitter(
    item_test_size=K,
    user_test_size=500,
    drop_cold_items=True,
    drop_cold_users=True,
    shuffle=True,
    seed=SEED
)

In [9]:
train, test = user_random_splitter.split(log)
train.count(), test.count()

22/02/27 22:14:47 WARN DAGScheduler: Broadcasting large task binary with size 2004.4 KiB
22/02/27 22:14:47 WARN TaskSetManager: Stage 10 contains a task of very large size (4073 KiB). The maximum recommended task size is 1000 KiB.
22/02/27 22:14:49 WARN DAGScheduler: Broadcasting large task binary with size 2011.1 KiB
22/02/27 22:14:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/27 22:14:50 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/27 22:14:51 WARN DAGScheduler: Broadcasting large task binary with size 2.2 MiB
22/02/27 22:14:51 WARN TaskSetManager: Stage 15 contains a task of very large size (4073 KiB). The maximum recommended task size is 1000 KiB.
22/02/27 22:14:51 WARN DAGScheduler: Broadcasting large task binary with size 2004.7 KiB
22/02/27 22:14:52 WARN TaskSetMan

(9995054, 5000)

In [10]:
train_opt, val_opt = user_random_splitter.split(train)
train_opt.count(), val_opt.count()

22/02/27 22:15:18 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:15:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/27 22:15:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
22/02/27 22:15:20 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:15:21 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:15:21 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:15:26 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:15:26 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:15:27 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:15:34 WARN DAGScheduler: Broadcasting large task binary 

(9990054, 5000)

# 2) Features preprocessing with pyspark

#### Year

In [11]:
year = item_features.withColumn('year', substring(col('title'), -5, 4).astype(IntegerType())).select('item_idx', 'year')
year.show(2)

+--------+----+
|item_idx|year|
+--------+----+
|      11|1995|
|     117|1995|
+--------+----+
only showing top 2 rows



#### Genres

In [12]:
genres = (
    State().session.createDataFrame(data.items[["item_id", "genres"]].rename({'item_id': 'item_idx'}, axis=1))
    .select(
        "item_idx",
        split("genres", "\|").alias("genres")
    )
)

In [13]:
genres.show()

+--------+--------------------+
|item_idx|              genres|
+--------+--------------------+
|       1|[Adventure, Anima...|
|       2|[Adventure, Child...|
|       3|   [Comedy, Romance]|
|       4|[Comedy, Drama, R...|
|       5|            [Comedy]|
|       6|[Action, Crime, T...|
|       7|   [Comedy, Romance]|
|       8|[Adventure, Child...|
|       9|            [Action]|
|      10|[Action, Adventur...|
|      11|[Comedy, Drama, R...|
|      12|    [Comedy, Horror]|
|      13|[Animation, Child...|
|      14|             [Drama]|
|      15|[Action, Adventur...|
|      16|      [Crime, Drama]|
|      17|[Comedy, Drama, R...|
|      18|[Comedy, Drama, T...|
|      19|            [Comedy]|
|      20|[Action, Comedy, ...|
+--------+--------------------+
only showing top 20 rows



In [14]:
genres_list = (
    genres.select(explode("genres").alias("genre"))
    .distinct().filter('genre <> "(no genres listed)"')
    .toPandas()["genre"].tolist()
)

In [15]:
genres_list

['Documentary',
 'IMAX',
 'Adventure',
 'Animation',
 'Comedy',
 'Thriller',
 'Sci-Fi',
 'Musical',
 'Horror',
 'Action',
 'Fantasy',
 'War',
 'Mystery',
 'Drama',
 'Film-Noir',
 'Crime',
 'Western',
 'Romance',
 'Children']

In [16]:
item_features = genres
for genre in genres_list:
    item_features = item_features.withColumn(
        genre,
        array_contains(col("genres"), genre).astype(IntegerType())
    )
item_features = item_features.drop("genres").cache()
item_features.count()

10681

In [17]:
item_features = item_features.join(year, on='item_idx', how='inner')
item_features.cache()
item_features.count()

8316

# 3) Building LightFM model based on interaction matrix and features

In [18]:
model_feat = LightFMWrap(random_state=SEED, loss='warp', no_components=128)

In [19]:
%%time
model_feat.fit(train, item_features=item_features)

22/02/27 22:15:44 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:15:45 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:15:49 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:15:49 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:15:50 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:15:50 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:15:52 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:15:54 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:15:55 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:15:57 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:15:58 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:16:00 WARN DAGScheduler: Broadcasting larg

CPU times: user 13h 4min 1s, sys: 53.2 s, total: 13h 4min 54s
Wall time: 17min 29s


In [20]:
%%time
recs = model_feat.predict(
    k=K,
    users=test.select('user_idx').distinct(),
    log=train,
    filter_seen_items=True,
    item_features=item_features
)

22/02/27 22:33:14 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:33:14 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:33:14 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:33:15 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
22/02/27 22:33:16 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
22/02/27 22:33:17 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:33:17 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:33:17 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
22/02/27 22:33:19 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
22/02/27 22:33:20 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:33:20 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:33:20 WARN DAGScheduler: Broadcasting larg

CPU times: user 205 ms, sys: 247 ms, total: 452 ms
Wall time: 23.1 s


# 4) Model evaluation

In [21]:
metrics = Experiment(test, {NDCG(): K,
                            MAP() : K,
                            HitRate(): [1, K],
                           Coverage(train): K})
 

22/02/27 22:33:37 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:33:39 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
                                                                                

In [22]:
metrics.add_result("LightFM_item_features", recs)
metrics.results

22/02/27 22:33:40 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:33:40 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:33:40 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:33:40 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:33:42 WARN DAGScheduler: Broadcasting large task binary with size 2.3 MiB
22/02/27 22:33:42 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
22/02/27 22:33:42 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
22/02/27 22:33:43 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
22/02/27 22:33:47 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
22/02/27 22:33:55 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
22/02/27 22:33:57 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
22/02/27 22:33:57 WARN DAGScheduler: Broadcasting larg

Unnamed: 0,Coverage@10,HitRate@1,HitRate@10,MAP@10,NDCG@10
LightFM_item_features,0.066404,0.348,0.784,0.121321,0.229198
