{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# This notebook does the following:\n", "1. Load Criteo Terabyte Click Logs Day 15 as Dask cuDF\n", "2. Process and format data\n", "3. Train a random forest model using GPUs by leveraging cuML\n", "4. Perform prediction & calculate accuracy" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "# install necessary libraries\n", "\n", "#!pip install dask_cudf\n", "#!pip install dask_ml\n", "#!pip install cuml --upgrade\n", "\n", "import cuml" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "# read data as Dask df\n", "\n", "from dask.distributed import Client, progress, wait\n", "import dask.dataframe as dd" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "client = Client()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "client\n", "workers = client.has_what().keys()\n", "n_workers = len(workers)\n", "n_streams = 8 # Performance optimization" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Refer to Dask Dataframe API documentation for various data processing operations:\n", "https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe\n", "Note that we are udin" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "import dask_cudf\n", "import numpy as np\n", "import cudf\n", "\n", "file = '/data/day_15' \n", "header = ['col'+str(i) for i in range (1,41)] #note that according to criteo, the first column in the dataset is Click Through (CT). Consist of 40 columns \n", "gdf_original = dask_cudf.read_csv(file, delimiter='\\t', names=header) " ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "
\n", "

Client

\n", "\n", "
\n", "

Cluster

\n", "
    \n", "
  • Workers: 3
  • \n", "
  • Cores: 3
  • \n", "
  • Memory: 354.39 GB
  • \n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "client" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{'tcp://172.17.1.221:35641': None,\n", " 'tcp://172.17.1.232:46093': None,\n", " 'tcp://172.17.2.23:35103': None}" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "client.run(cudf.set_allocator, \"managed\") # Uses managed memory instead of \"default\"" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
col1col2col3col4col5col6col7col8col9col10...col31col32col33col34col35col36col37col38col39col40
002.09.0<NA>1.0<NA>0.00.031...1f7fc70bb8170bba9512c20b31a9f3b3228aee9bb74c654859f9dd38165fbf320b3c06d02ccea557
1012.0166.03.03.0<NA>1.00.013...d20856aab6bc86c5108a0699e7ef7c20113b1789670bb82a0c427c16fc6fc912991321ea2997ef88
201.066.0<NA><NA><NA><NA><NA>20...753da5f3b8170bba9512c20b1a0af64813b96cbc3f2bae22209c86ee165fbf32ff6548022ccea557
301.0<NA><NA><NA><NA><NA><NA>21...1f7fc70bb8170bba7a7178b20da1444bcf12754eaf22e988c483d0dd75350c8a57e36578ed10571d
402.0<NA>4.04.0<NA>7.00.0594...d20856aaa1eb15119512c20b44fa1260c59d0ef0c41079d638d2af5237dcf7a2ff654802b757e957
\n", "

5 rows × 40 columns

\n", "
" ], "text/plain": [ " col1 col2 col3 col4 col5 col6 col7 col8 col9 col10 ... \\\n", "0 0 2.0 9.0 1.0 0.0 0.0 3 1 ... \n", "1 0 12.0 166.0 3.0 3.0 1.0 0.0 1 3 ... \n", "2 0 1.0 66.0 2 0 ... \n", "3 0 1.0 2 1 ... \n", "4 0 2.0 4.0 4.0 7.0 0.0 59 4 ... \n", "\n", " col31 col32 col33 col34 col35 col36 col37 \\\n", "0 1f7fc70b b8170bba 9512c20b 31a9f3b3 228aee9b b74c6548 59f9dd38 \n", "1 d20856aa b6bc86c5 108a0699 e7ef7c20 113b1789 670bb82a 0c427c16 \n", "2 753da5f3 b8170bba 9512c20b 1a0af648 13b96cbc 3f2bae22 209c86ee \n", "3 1f7fc70b b8170bba 7a7178b2 0da1444b cf12754e af22e988 c483d0dd \n", "4 d20856aa a1eb1511 9512c20b 44fa1260 c59d0ef0 c41079d6 38d2af52 \n", "\n", " col38 col39 col40 \n", "0 165fbf32 0b3c06d0 2ccea557 \n", "1 fc6fc912 991321ea 2997ef88 \n", "2 165fbf32 ff654802 2ccea557 \n", "3 75350c8a 57e36578 ed10571d \n", "4 37dcf7a2 ff654802 b757e957 \n", "\n", "[5 rows x 40 columns]" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "gdf_original.head()" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "col1 int64\n", "col2 float64\n", "col3 float64\n", "col4 float64\n", "col5 float64\n", "col6 float64\n", "col7 float64\n", "col8 float64\n", "col9 int64\n", "col10 int64\n", "col11 float64\n", "col12 float64\n", "col13 float64\n", "col14 float64\n", "dtype: object" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "gdf_sliced = gdf_original.iloc[:, 0:14]\n", "# gdf_sliced_small = gdf_sliced.sample(frac=0.1)\n", "gdf_sliced_small = gdf_sliced\n", "gdf_sliced.dtypes" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
col1col2col3col4col5col6col7col8col9col10col11col12col13col14
002.09.0<NA>1.0<NA>0.00.0310.0<NA>1036.0<NA>
1012.0166.03.03.0<NA>1.00.0131.0<NA>28.03.0
201.066.0<NA><NA><NA><NA><NA>20<NA><NA>1211.0<NA>
301.0<NA><NA><NA><NA><NA><NA>21<NA><NA>8.0<NA>
402.0<NA>4.04.0<NA>7.00.05941.0<NA>378.04.0
\n", "
" ], "text/plain": [ " col1 col2 col3 col4 col5 col6 col7 col8 col9 col10 col11 col12 \\\n", "0 0 2.0 9.0 1.0 0.0 0.0 3 1 0.0 \n", "1 0 12.0 166.0 3.0 3.0 1.0 0.0 1 3 1.0 \n", "2 0 1.0 66.0 2 0 \n", "3 0 1.0 2 1 \n", "4 0 2.0 4.0 4.0 7.0 0.0 59 4 1.0 \n", "\n", " col13 col14 \n", "0 1036.0 \n", "1 28.0 3.0 \n", "2 1211.0 \n", "3 8.0 \n", "4 378.0 4.0 " ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "gdf_sliced_small.head()" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "from cuml.dask.ensemble import RandomForestClassifier as cumlDaskRF\n", "from cuml.dask.common import utils as dask_utils" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Refer to Official Dask Documentation for Best Practices on repartitioning your Dask Dataframe:\n", "https://docs.dask.org/en/latest/dataframe-best-practices.html#repartition-to-reduce-overhead" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "# You should aim for partitions that have around 100MB of data each.\n", "gdf_sliced_small = gdf_sliced_small.astype(np.float32).repartition(npartitions=450) \n", "\n", "# gdf = gdf.persist() # if on a distributed system" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "gdf_sliced_small = gdf_sliced_small.fillna(0)" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "col1 float32\n", "col2 float32\n", "col3 float32\n", "col4 float32\n", "col5 float32\n", "col6 float32\n", "col7 float32\n", "col8 float32\n", "col9 float32\n", "col10 float32\n", "col11 float32\n", "col12 float32\n", "col13 float32\n", "col14 float32\n", "dtype: object" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "gdf_sliced_small.dtypes" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# split data into training and Y\n", "Y = gdf_sliced_small.pop('col1') # first column is binary (click or not)\n", "Y = Y.astype(np.int32)\n", "Y" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 4.1 s, sys: 825 ms, total: 4.92 s\n", "Wall time: 50min 46s\n" ] }, { "data": { "text/plain": [ "" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "%%time\n", "\n", "# Random Forest building parameters\n", "n_streams = 8 # optimization\n", "max_depth = 10\n", "n_bins = 16\n", "n_trees = 10\n", "\n", "cuml_model = cumlDaskRF(max_depth=max_depth, n_estimators=n_trees, n_bins=n_bins, n_streams=n_streams, verbose=True, client=client)\n", "\n", "cuml_model.fit(gdf_sliced_small, Y)" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
col2col3col4col5col6col7col8col9col10col11col12col13col14
npartitions=450
float32float32float32float32float32float32float32float32float32float32float32float32float32
.......................................
..........................................
.......................................
.......................................
\n", "
\n", "
Dask Name: drop_by_shallow_copy, 2235 tasks
" ], "text/plain": [ "" ] }, "execution_count": 25, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# split data into gdf_test and test_y for testing set\n", "gdf_test = gdf_original.loc['0':'6000']\n", "gdf_test = gdf_test.iloc[:, 0:14]\n", "gdf_test = gdf_test.astype(np.float32).repartition(npartitions=450)\n", "gdf_test = gdf_test.fillna(0)\n", "test_y = gdf_test.pop('col1') # first column is binary (click or not)\n", "test_y = test_y.astype(np.int32)\n", "gdf_test" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "# Model prediction\n", "pred_df = cuml_model.predict(gdf_sliced_small)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# converting from Dask cuDF Series to NumPy array\n", "pred_df = pred_df.compute().to_array()\n", "pred_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# converting from Dask cuDF Series to NumPy array\n", "Y = Y.compute().to_array()\n", "Y" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn import metrics\n", "# Model Accuracy\n", "print(\"Accuracy:\",metrics.accuracy_score(Y, pred_df))" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" } }, "nbformat": 4, "nbformat_minor": 4 }