# This notebook does the following:
1. Load Criteo Terabyte Click Logs Day 15 as Dask cuDF
2. Process and format data
3. Train a random forest model using GPUs by leveraging cuML
4. Perform prediction & calculate accuracy

In [1]:
# install necessary libraries

#!pip install dask_cudf
#!pip install dask_ml
#!pip install cuml --upgrade

import cuml

In [2]:
# read data as Dask df

from dask.distributed import Client, progress, wait
import dask.dataframe as dd

In [3]:
client = Client()

In [4]:
client
workers = client.has_what().keys()
n_workers = len(workers)
n_streams = 8 # Performance optimization

Refer to Dask Dataframe API documentation for various data processing operations:
https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe
Note that we are udin

In [5]:
import dask_cudf
import numpy as np
import cudf

file = '/data/day_15' 
header = ['col'+str(i) for i in range (1,41)] #note that according to criteo, the first column in the dataset is Click Through (CT). Consist of 40 columns 
gdf_original = dask_cudf.read_csv(file, delimiter='\t', names=header) 

In [6]:
client

0,1
Client  Scheduler: tcp://rapidsai-scheduler:8786  Dashboard: /proxy/rapidsai-scheduler:8787/status,Cluster  Workers: 3  Cores: 3  Memory: 354.39 GB


In [7]:
client.run(cudf.set_allocator, "managed")  # Uses managed memory instead of "default"

{'tcp://172.17.1.221:35641': None,
 'tcp://172.17.1.232:46093': None,
 'tcp://172.17.2.23:35103': None}

In [8]:
gdf_original.head()

Unnamed: 0,col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,...,col31,col32,col33,col34,col35,col36,col37,col38,col39,col40
0,0,2.0,9.0,,1.0,,0.0,0.0,3,1,...,1f7fc70b,b8170bba,9512c20b,31a9f3b3,228aee9b,b74c6548,59f9dd38,165fbf32,0b3c06d0,2ccea557
1,0,12.0,166.0,3.0,3.0,,1.0,0.0,1,3,...,d20856aa,b6bc86c5,108a0699,e7ef7c20,113b1789,670bb82a,0c427c16,fc6fc912,991321ea,2997ef88
2,0,1.0,66.0,,,,,,2,0,...,753da5f3,b8170bba,9512c20b,1a0af648,13b96cbc,3f2bae22,209c86ee,165fbf32,ff654802,2ccea557
3,0,1.0,,,,,,,2,1,...,1f7fc70b,b8170bba,7a7178b2,0da1444b,cf12754e,af22e988,c483d0dd,75350c8a,57e36578,ed10571d
4,0,2.0,,4.0,4.0,,7.0,0.0,59,4,...,d20856aa,a1eb1511,9512c20b,44fa1260,c59d0ef0,c41079d6,38d2af52,37dcf7a2,ff654802,b757e957


In [9]:
gdf_sliced = gdf_original.iloc[:, 0:14]
# gdf_sliced_small = gdf_sliced.sample(frac=0.1)
gdf_sliced_small = gdf_sliced
gdf_sliced.dtypes

col1       int64
col2     float64
col3     float64
col4     float64
col5     float64
col6     float64
col7     float64
col8     float64
col9       int64
col10      int64
col11    float64
col12    float64
col13    float64
col14    float64
dtype: object

In [10]:
gdf_sliced_small.head()

Unnamed: 0,col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14
0,0,2.0,9.0,,1.0,,0.0,0.0,3,1,0.0,,1036.0,
1,0,12.0,166.0,3.0,3.0,,1.0,0.0,1,3,1.0,,28.0,3.0
2,0,1.0,66.0,,,,,,2,0,,,1211.0,
3,0,1.0,,,,,,,2,1,,,8.0,
4,0,2.0,,4.0,4.0,,7.0,0.0,59,4,1.0,,378.0,4.0


In [11]:
from cuml.dask.ensemble import RandomForestClassifier as cumlDaskRF
from cuml.dask.common import utils as dask_utils

Refer to Official Dask Documentation for Best Practices on repartitioning your Dask Dataframe:
https://docs.dask.org/en/latest/dataframe-best-practices.html#repartition-to-reduce-overhead

In [12]:
# You should aim for partitions that have around 100MB of data each.
gdf_sliced_small = gdf_sliced_small.astype(np.float32).repartition(npartitions=450) 

# gdf = gdf.persist()  # if on a distributed system

In [13]:
gdf_sliced_small = gdf_sliced_small.fillna(0)

In [14]:
gdf_sliced_small.dtypes

col1     float32
col2     float32
col3     float32
col4     float32
col5     float32
col6     float32
col7     float32
col8     float32
col9     float32
col10    float32
col11    float32
col12    float32
col13    float32
col14    float32
dtype: object

In [15]:
# split data into training and Y
Y = gdf_sliced_small.pop('col1') # first column is binary (click or not)
Y = Y.astype(np.int32)
Y

<dask_cudf.Series | 2508 tasks | 450 npartitions>

In [16]:
%%time

# Random Forest building parameters
n_streams = 8 # optimization
max_depth = 10
n_bins = 16
n_trees = 10

cuml_model = cumlDaskRF(max_depth=max_depth, n_estimators=n_trees, n_bins=n_bins, n_streams=n_streams, verbose=True, client=client)

cuml_model.fit(gdf_sliced_small, Y)

CPU times: user 4.1 s, sys: 825 ms, total: 4.92 s
Wall time: 50min 46s


<cuml.dask.ensemble.randomforestclassifier.RandomForestClassifier at 0x7fbb7a88a250>

In [25]:
# split data into gdf_test and test_y for testing set
gdf_test = gdf_original.loc['0':'6000']
gdf_test = gdf_test.iloc[:, 0:14]
gdf_test = gdf_test.astype(np.float32).repartition(npartitions=450)
gdf_test = gdf_test.fillna(0)
test_y = gdf_test.pop('col1') # first column is binary (click or not)
test_y = test_y.astype(np.int32)
gdf_test

Unnamed: 0_level_0,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14
npartitions=450,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
,float32,float32,float32,float32,float32,float32,float32,float32,float32,float32,float32,float32,float32
,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...


In [None]:
%%time

# Model prediction
pred_df = cuml_model.predict(gdf_sliced_small)

In [None]:
# converting from Dask cuDF Series to NumPy array
pred_df = pred_df.compute().to_array()
pred_df

In [None]:
# converting from Dask cuDF Series to NumPy array
Y = Y.compute().to_array()
Y

In [None]:
from sklearn import metrics
# Model Accuracy
print("Accuracy:",metrics.accuracy_score(Y, pred_df))