# Introduction

This notebook gives you a short introduction on how to use Dask to parallelize model training, particularly if you have multiple learning tasks on which you want to train individual models for.

For brevity, I will not be elaborating on the exact machine learning task here, but focus on the idioms that we need to use Dask for this task.

In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

from dask.distributed import LocalCluster, Client
import numpy as np
import pandas as pd
import janitor

## Instantiate a Dask Cluster

Here, we instantiate a Dask `cluster` (this is only a `LocalCluster`, but other cluster types can be created too, such as an `SGECluster` or `KubeCluster`. We then connect a `client` to the cluster.

In [2]:
client = Client()

Port 8787 is already in use. 
Perhaps you already have a cluster running?
Hosting the diagnostics dashboard on a random port instead.


## Data Preprocessing

We will now preprocess our data and get it into a shape for machine learning.

In [4]:
from utils import molecular_weights, featurize_sequence_

In [6]:
drugs = ['ATV', 'DRV', 'FPV', 'IDV', 'LPV', 'NFV', 'SQV', 'TPV']

data = (
    pd.read_csv("data/hiv-protease-data-expanded.csv", index_col=0)
    .query("weight == 1.0")
    .transform_column("sequence", lambda x: len(x), "seq_length")
    .query("seq_length == 99")
    .transform_column("sequence", featurize_sequence_, "features")
    .transform_columns(drugs, np.log10)
)

features = pd.DataFrame(np.vstack(data['features'])).set_index(data.index)

In [7]:
data.head(3)

Unnamed: 0,ATV,DRV,FPV,IDV,LPV,NFV,SQV,SeqID,TPV,seqid,sequence,sequence_object,weight,seq_length,features
6,1.50515,,0.477121,1.544068,1.50515,1.462398,2.214844,4426,,4426-0,PQITLWQRPIVTIKIGGQLKEALLDTGADDTVLEEMNLPGKWKPKM...,ID: 4426-0\nName: <unknown name>\nDescription:...,1.0,99,"[[115.131, 146.1451, 131.1736, 119.1197, 131.1..."
7,,,0.176091,0.0,,0.342423,0.041393,4432,,4432-0,PQITLWQRPLVTVKIGGQLKEALLDTGADDTVLEEMNLPGRWKPKM...,ID: 4432-0\nName: <unknown name>\nDescription:...,1.0,99,"[[115.131, 146.1451, 131.1736, 119.1197, 131.1..."
14,,,0.491362,0.939519,,1.50515,1.227887,4664,,4664-0,PQITLWQRPIVTIKVGGQLIEALLDTGADDTVLEEINLPGRWKPKM...,ID: 4664-0\nName: <unknown name>\nDescription:...,1.0,99,"[[115.131, 146.1451, 131.1736, 119.1197, 131.1..."


In [8]:
features.head(3)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,89,90,91,92,93,94,95,96,97,98
6,115.131,146.1451,131.1736,119.1197,131.1736,204.2262,146.1451,174.2017,115.131,131.1736,...,131.1736,119.1197,146.1451,131.1736,75.0669,121.159,119.1197,131.1736,132.1184,165.19
7,115.131,146.1451,131.1736,119.1197,131.1736,204.2262,146.1451,174.2017,115.131,131.1736,...,131.1736,119.1197,146.1451,131.1736,75.0669,121.159,119.1197,131.1736,132.1184,165.19
14,115.131,146.1451,131.1736,119.1197,131.1736,204.2262,146.1451,174.2017,115.131,131.1736,...,149.2124,119.1197,146.1451,131.1736,75.0669,121.159,119.1197,131.1736,132.1184,165.19


## Define training functions

When writing code to interface with Dask, a functional paradigm is often preferred. Hence, we will write the procedures that are needed inside functions that can be submitted by the `client` to the `cluster`.

In [9]:
from utils import featurize_sequence_, fit_model, cross_validate, predict

Now, we'll scatter the data around the workers. `dataf` is named as such because this is the "data futures", a "promise" to the workers that `data` will exist for them and that they can access it. Likewise for `featuresf`.

In [10]:
dataf = client.scatter(data)
featuresf = client.scatter(features)

Now, we fit the models, and collect their cross-validated scores.

In [11]:
models = dict()
scores = dict()


for drug in drugs:
    models[drug] = client.submit(fit_model, dataf, featuresf, drug)
    scores[drug] = client.submit(cross_validate, dataf, featuresf, drug)
    
models = client.gather(models)

Finally, let's save the models. To save space on disk, we will pickle and gzip them.

In [12]:
import pickle as pkl
import gzip

for name, model in models.items():
    with gzip.open(f"data/models/{name}.pkl.gz", 'wb') as f:
        pkl.dump(model, f)

In [13]:
scores = client.gather(scores)
with gzip.open("data/scores.pkl.gz", "wb") as f:
    pkl.dump(scores, f)