{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Dask for Machine Learning" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dask integrates well with machine learning libraries like [scikit-learn](http://scikit-learn.org/).\n", "\n", "[Dask-ML](http://dask-ml.readthedocs.io/en/latest/index.html) implements scalable machine learning algorithms that are compatible with scikit-learn." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from dask_gateway import Gateway\n", "\n", "gateway = Gateway()\n", "cluster = gateway.new_cluster()\n", "cluster.scale(10)\n", "cluster" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from dask.distributed import Client, progress\n", "c = Client(cluster)\n", "c" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Distributed Training\n", "\n", "Scikit-learn uses [joblib](http://joblib.readthedocs.io/) for single-machine parallelism. This lets you train most estimators (anything that accepts an `n_jobs` parameter) using all the cores of your laptop or workstation.\n", "\n", "Dask registers a joblib backend. This lets you train those estimators using all the cores of your *cluster*, by changing one line of code. \n", "\n", "This is most useful for training large models on medium-sized datasets. You may have a large model when searching over many hyper-parameters, or when using an ensemble method with many individual estimators. For too small datasets, training times will typically be small enough that cluster-wide parallelism isn't helpful. For too large datasets (larger than a single machine's memory), the scikit-learn estimators may not be able to cope (see below)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.datasets import make_classification\n", "from sklearn.svm import SVC\n", "from sklearn.model_selection import GridSearchCV\n", "import pandas as pd" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll use scikit-learn to create a pair of small random arrays, one for the features `X`, and one for the target `y`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "X, y = make_classification(n_samples=1000, random_state=0)\n", "X[:5]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll fit a [Support Vector Classifier](http://scikit-learn.org/stable/modules/generated/sklearn.svm.SVC.html), using [grid search](http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html) to find the best combination of hyperparameters." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "param_grid = {\"C\": [0.001, 0.01, 0.1, 0.5, 1.0, 2.0, 5.0, 10.0],\n", " \"kernel\": ['rbf', 'poly', 'sigmoid'],\n", " \"shrinking\": [True, False]}\n", "\n", "grid_search = GridSearchCV(SVC(gamma='auto', random_state=0, probability=True),\n", " param_grid=param_grid,\n", " return_train_score=False,\n", " n_jobs=-1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To fit that normally, we'd call\n", "\n", "```python\n", "grid_search.fit(X, y)\n", "```\n", "\n", "To fit it using the cluster, we just need to use a context manager provided by joblib.\n", "We'll pre-scatter the data to each worker, which can help with performance." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import joblib\n", "\n", "with joblib.parallel_backend('dask', scatter=[X, y]):\n", " grid_search.fit(X, y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We fit 48 different models, one for each hyper-parameter combination in `param_grid`, distributed across the cluster. At this point, we have a regular scikit-learn model, which can be used for prediction, scoring, etc." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pd.DataFrame(grid_search.cv_results_).head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "grid_search.predict(X)[:5]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "grid_search.score(X, y)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For more on training scikit-learn models with distributed joblib, see the [dask-ml documentation](http://dask-ml.readthedocs.io/en/latest/joblib.html)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Training on Large Datasets\n", "\n", "Most estimators in scikit-learn are designed to work on in-memory arrays. Training with larger datasets may require different algorithms.\n", "\n", "All of the algorithms implemented in Dask-ML work well on larger than memory datasets, which you might store in a [dask array](http://dask.pydata.org/en/latest/array.html) or [dataframe](http://dask.pydata.org/en/latest/dataframe.html)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import dask_ml.datasets\n", "import dask_ml.cluster\n", "import matplotlib.pyplot as plt" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this example, we'll use `dask_ml.datasets.make_blobs` to generate some random *dask* arrays." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "X, y = dask_ml.datasets.make_blobs(n_samples=10000000,\n", " chunks=1000000,\n", " random_state=0,\n", " centers=3)\n", "X = X.persist()\n", "X" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll use the k-means implemented in Dask-ML to cluster the points. It uses the `k-means||` (read: \"k-means parallel\") initialization algorithm, which scales better than `k-means++`. All of the computation, both during and after initialization, can be done in parallel." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "km = dask_ml.cluster.KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10)\n", "km.fit(X)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We'll plot a sample of points, colored by the cluster each falls into." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fig, ax = plt.subplots()\n", "ax.scatter(X[::10000, 0], X[::10000, 1], marker='.', c=km.labels_[::10000],\n", " cmap='viridis', alpha=0.25);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For all the estimators implemented in Dask-ML, see the [API documentation](http://dask-ml.readthedocs.io/en/latest/modules/api.html)." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.10" } }, "nbformat": 4, "nbformat_minor": 2 }