{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Distributed Scikit-Learn for CPU Bound Problems\n", "\n", "This example demonstrates how Dask can scale scikit-learn to a cluster of machines for a CPU-bound problem.\n", "We'll fit a large model, a grid-search over many hyper-parameters, on a small dataset.\n", "\n", "This video talks demonstrates the same example on a larger cluster." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "keep_output": true }, "outputs": [ { "data": { "text/html": [ "" ], "text/plain": [ "" ] }, "execution_count": null, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from IPython.display import HTML\n", "\n", "HTML(\"\"\"\"\"\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from dask.distributed import Client, progress\n", "client = Client(processes=False, threads_per_worker=4, n_workers=1, memory_limit='2GB')\n", "client" ] }, { "cell_type": "markdown", "metadata": { "keep_output": true }, "source": [ "## Distributed Training\n", "\n", " \n", "\n", "Scikit-learn uses [joblib](http://joblib.readthedocs.io/) for single-machine parallelism. This lets you train most estimators (anything that accepts an `n_jobs` parameter) using all the cores of your laptop or workstation.\n", "\n", "Dask registers a joblib backend. This lets you train those estimators using all the cores of your *cluster*, by changing one line of code.\n", "\n", "This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (though Dask-ML provides other ways for working with larger than memory datasets)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import dask_ml.joblib # register the distriubted backend\n", "\n", "from pprint import pprint\n", "from time import time\n", "import logging\n", "\n", "from sklearn.datasets import fetch_20newsgroups\n", "from sklearn.feature_extraction.text import CountVectorizer\n", "from sklearn.feature_extraction.text import TfidfTransformer\n", "from sklearn.linear_model import SGDClassifier\n", "from sklearn.model_selection import GridSearchCV\n", "from sklearn.pipeline import Pipeline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Scale Up: set categories=None to use all the categories\n", "categories = [\n", " 'alt.atheism',\n", " 'talk.religion.misc',\n", "]\n", "\n", "print(\"Loading 20 newsgroups dataset for categories:\")\n", "print(categories)\n", "\n", "data = fetch_20newsgroups(subset='train', categories=categories)\n", "print(\"%d documents\" % len(data.filenames))\n", "print(\"%d categories\" % len(data.target_names))\n", "print()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll define a small pipeline that combines text feature extraction with a simple classifier." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline = Pipeline([\n", " ('vect', CountVectorizer()),\n", " ('tfidf', TfidfTransformer()),\n", " ('clf', SGDClassifier(max_iter=1000)),\n", "])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Grid search over some parameters." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "parameters = {\n", " 'vect__max_df': (0.5, 0.75, 1.0),\n", " #'vect__max_features': (None, 5000, 10000, 50000),\n", " 'vect__ngram_range': ((1, 1), (1, 2)), # unigrams or bigrams\n", " #'tfidf__use_idf': (True, False),\n", " #'tfidf__norm': ('l1', 'l2'),\n", " # 'clf__alpha': (0.00001, 0.000001),\n", " # 'clf__penalty': ('l2', 'elasticnet'),\n", " #'clf__n_iter': (10, 50, 80),\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "grid_search = GridSearchCV(pipeline, parameters, n_jobs=-1, verbose=1, cv=3, refit=False, iid=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To fit this normally, we would write\n", "\n", "\n", "```python\n", "grid_search.fit(data.data, data.target)\n", "```\n", "\n", "That would use the default joblib backend (multiple processes) for parallelism.\n", "To use the Dask distributed backend, which will use a cluster of machines to train the model, perform the fit in a `parallel_backend` context." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.externals import joblib" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "with joblib.parallel_backend('dask', scatter=[data.data, data.target]):\n", " grid_search.fit(data.data, data.target)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you had your distributed dashboard open during that fit, you'll notice that each worker performs some of the fit tasks." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" } }, "nbformat": 4, "nbformat_minor": 2 }