{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dask for Machine Learning\n",
"\n",
"This is a high-level overview demonstrating some the components of Dask-ML.\n",
"See [here](./machine-learning) for more details on each individual component."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Client, progress\n",
"client = Client(processes=False, threads_per_worker=4, n_workers=1, memory_limit='2GB')\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Distributed Training\n",
"\n",
" \n",
"\n",
"Scikit-learn uses [joblib](http://joblib.readthedocs.io/) for single-machine parallelism. This lets you train most estimators (anything that accepts an `n_jobs` parameter) using all the cores of your laptop or workstation.\n",
"\n",
"Dask registers a joblib backend. This lets you train those estimators using all the cores of your *cluster*, by changing one line of code. \n",
"\n",
"This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (see below)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask_ml.joblib # register the distriubted backend\n",
"from sklearn.datasets import make_classification\n",
"from sklearn.svm import SVC\n",
"from sklearn.model_selection import GridSearchCV\n",
"import pandas as pd"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We'll use scikit-learn to create a pair of small random arrays, one for the features `X`, and one for the target `y`."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"X, y = make_classification(n_samples=1000, random_state=0)\n",
"X[:5]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We'll fit a [Support Vector Classifier](http://scikit-learn.org/stable/modules/generated/sklearn.svm.SVC.html), using [grid search](http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html) to find the best value of the $C$ hyperparameter."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"param_grid = {\"C\": [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],\n",
" \"kernel\": ['rbf', 'poly', 'sigmoid'],\n",
" \"shrinking\": [True, False]}\n",
"\n",
"grid_search = GridSearchCV(SVC(gamma='auto', random_state=0, probability=True),\n",
" param_grid=param_grid,\n",
" return_train_score=False,\n",
" iid=True,\n",
" n_jobs=-1)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To fit that normally, we'd call\n",
"\n",
"```python\n",
"grid_search.fit(X, y)\n",
"```\n",
"\n",
"To fit it using the cluster, we just need to use a context manager provided by joblib.\n",
"We'll pre-scatter the data to each worker, which can help with performance."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.externals import joblib\n",
"\n",
"with joblib.parallel_backend('dask', scatter=[X, y]):\n",
" grid_search.fit(X, y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We fit 48 different models, one for each hyper-parameter combination in `param_grid`, distributed across the cluster. At this point, we have a regular scikit-learn model, which can be used for prediction, scoring, etc."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pd.DataFrame(grid_search.cv_results_).head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"grid_search.predict(X)[:5]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"grid_search.score(X, y)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For more on training scikit-learn models with distributed joblib, see the [dask-ml documentation](http://dask-ml.readthedocs.io/en/latest/joblib.html)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Training on Large Datasets\n",
"\n",
"Most estimators in scikit-learn are designed to work on in-memory arrays. Training with larger datasets may require different algorithms.\n",
"\n",
"All of the algorithms implemented in Dask-ML work well on larger than memory datasets, which you might store in a [dask array](http://dask.pydata.org/en/latest/array.html) or [dataframe](http://dask.pydata.org/en/latest/dataframe.html)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%matplotlib inline"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask_ml.datasets\n",
"import dask_ml.cluster\n",
"import matplotlib.pyplot as plt"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this example, we'll use `dask_ml.datasets.make_blobs` to generate some random *dask* arrays."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"X, y = dask_ml.datasets.make_blobs(n_samples=10000000,\n",
" chunks=1000000,\n",
" random_state=0,\n",
" centers=3)\n",
"X = X.persist()\n",
"X"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We'll use the k-means implemented in Dask-ML to cluster the points. It uses the `k-means||` (read: \"k-means parallel\") initialization algorithm, which scales better than `k-means++`. All of the computation, both during and after initialization, can be done in parallel."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"km = dask_ml.cluster.KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10)\n",
"km.fit(X)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We'll plot a sample of points, colored by the cluster each falls into."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"fig, ax = plt.subplots()\n",
"ax.scatter(X[::10000, 0], X[::10000, 1], marker='.', c=km.labels_[::10000],\n",
" cmap='viridis', alpha=0.25);"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For all the estimators implemented in Dask-ML, see the [API documentation](http://dask-ml.readthedocs.io/en/latest/modules/api.html)."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}