{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Distributed Scikit-Learn for CPU Bound Problems\n",
"\n",
"This example demonstrates how Dask can scale scikit-learn to a cluster of machines for a CPU-bound problem.\n",
"We'll fit a large model, a grid-search over many hyper-parameters, on a small dataset.\n",
"\n",
"This video talks demonstrates the same example on a larger cluster."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"keep_output": true
},
"outputs": [
{
"data": {
"text/html": [
""
],
"text/plain": [
""
]
},
"execution_count": null,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from IPython.display import HTML\n",
"\n",
"HTML(\"\"\"\"\"\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Client, progress\n",
"client = Client(processes=False, threads_per_worker=4, n_workers=1, memory_limit='2GB')\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {
"keep_output": true
},
"source": [
"## Distributed Training\n",
"\n",
" \n",
"\n",
"Scikit-learn uses [joblib](http://joblib.readthedocs.io/) for single-machine parallelism. This lets you train most estimators (anything that accepts an `n_jobs` parameter) using all the cores of your laptop or workstation.\n",
"\n",
"Dask registers a joblib backend. This lets you train those estimators using all the cores of your *cluster*, by changing one line of code.\n",
"\n",
"This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (though Dask-ML provides other ways for working with larger than memory datasets)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask_ml.joblib # register the distriubted backend\n",
"\n",
"from pprint import pprint\n",
"from time import time\n",
"import logging\n",
"\n",
"from sklearn.datasets import fetch_20newsgroups\n",
"from sklearn.feature_extraction.text import CountVectorizer\n",
"from sklearn.feature_extraction.text import TfidfTransformer\n",
"from sklearn.linear_model import SGDClassifier\n",
"from sklearn.model_selection import GridSearchCV\n",
"from sklearn.pipeline import Pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Scale Up: set categories=None to use all the categories\n",
"categories = [\n",
" 'alt.atheism',\n",
" 'talk.religion.misc',\n",
"]\n",
"\n",
"print(\"Loading 20 newsgroups dataset for categories:\")\n",
"print(categories)\n",
"\n",
"data = fetch_20newsgroups(subset='train', categories=categories)\n",
"print(\"%d documents\" % len(data.filenames))\n",
"print(\"%d categories\" % len(data.target_names))\n",
"print()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We'll define a small pipeline that combines text feature extraction with a simple classifier."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pipeline = Pipeline([\n",
" ('vect', CountVectorizer()),\n",
" ('tfidf', TfidfTransformer()),\n",
" ('clf', SGDClassifier(max_iter=1000)),\n",
"])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Grid search over some parameters."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"parameters = {\n",
" 'vect__max_df': (0.5, 0.75, 1.0),\n",
" #'vect__max_features': (None, 5000, 10000, 50000),\n",
" 'vect__ngram_range': ((1, 1), (1, 2)), # unigrams or bigrams\n",
" #'tfidf__use_idf': (True, False),\n",
" #'tfidf__norm': ('l1', 'l2'),\n",
" # 'clf__alpha': (0.00001, 0.000001),\n",
" # 'clf__penalty': ('l2', 'elasticnet'),\n",
" #'clf__n_iter': (10, 50, 80),\n",
"}"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"grid_search = GridSearchCV(pipeline, parameters, n_jobs=-1, verbose=1, cv=3, refit=False, iid=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To fit this normally, we would write\n",
"\n",
"\n",
"```python\n",
"grid_search.fit(data.data, data.target)\n",
"```\n",
"\n",
"That would use the default joblib backend (multiple processes) for parallelism.\n",
"To use the Dask distributed backend, which will use a cluster of machines to train the model, perform the fit in a `parallel_backend` context."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from sklearn.externals import joblib"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"with joblib.parallel_backend('dask', scatter=[data.data, data.target]):\n",
" grid_search.fit(data.data, data.target)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you had your distributed dashboard open during that fit, you'll notice that each worker performs some of the fit tasks."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}