{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Text Vectorization Pipeline\n", "\n", "This example illustrates how Dask-ML can be used to classify large textual datasets in parallel.\n", "It is adapted from [this scikit-learn example](https://scikit-learn.org/stable/auto_examples/applications/plot_out_of_core_classification.html#sphx-glr-auto-examples-applications-plot-out-of-core-classification-py).\n", "\n", "The primary differences are that\n", "\n", "* We fit the entire model, including text vectorization, as a pipeline.\n", "* We use dask collections like [Dask Bag](https://docs.dask.org/en/latest/bag.html), [Dask Dataframe](https://docs.dask.org/en/latest/dataframe.html), and [Dask Array](https://docs.dask.org/en/latest/array.html)\n", " rather than generators to work with larger than memory datasets." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from dask.distributed import Client, progress\n", "\n", "client = Client(n_workers=2, threads_per_worker=2, memory_limit='2GB')\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Fetch the data\n", "\n", "Scikit-Learn provides a utility to fetch the newsgroups dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sklearn.datasets\n", "\n", "bunch = sklearn.datasets.fetch_20newsgroups()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The data from scikit-learn isn't *too* large, so the data is just\n", "returned in memory. Each document is a string. The target we're predicting\n", "is an integer, which codes the topic of the post.\n", "\n", "We'll load the documents and targets directly into a dask DataFrame.\n", "In practice, on a larger than memory dataset, you would likely load the\n", "documents from disk or cloud storage using `dask.bag` or `dask.delayed`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import dask.dataframe as dd\n", "import pandas as pd\n", "\n", "df = dd.from_pandas(pd.DataFrame({\"text\": bunch.data, \"target\": bunch.target}),\n", " npartitions=25)\n", "\n", "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Each row in the `text` column has a bit of metadata and the full text of a post." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(df.head().loc[0, 'text'][:500])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Feature Hashing" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dask's [HashingVectorizer](https://ml.dask.org/modules/generated/dask_ml.feature_extraction.text.HashingVectorizer.html#dask_ml.feature_extraction.text.HashingVectorizer) provides a similar API to [scikit-learn's implementation](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.HashingVectorizer.html). In fact, Dask-ML's implementation uses scikit-learn's, applying it to each partition of the input `dask.dataframe.Series` or `dask.bag.Bag`.\n", "\n", "Transformation, once we actually compute the result, happens in parallel and returns a dask Array." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import dask_ml.feature_extraction.text\n", "\n", "vect = dask_ml.feature_extraction.text.HashingVectorizer()\n", "X = vect.fit_transform(df['text'])\n", "X" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The output array `X` has unknown chunk sizes becase the input dask Series or Bags don't know their own length.\n", "\n", "Each block in `X` is a `scipy.sparse` matrix." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "X.blocks[0].compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is a document-term matrix. Each row is the hashed representation of the original post." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Classification Pipeline\n", "\n", "We can combine the [HashingVectorizer](https://ml.dask.org/modules/generated/dask_ml.feature_extraction.text.HashingVectorizer.html#dask_ml.feature_extraction.text.HashingVectorizer) with [Incremental](https://ml.dask.org/modules/generated/dask_ml.wrappers.Incremental.html#dask_ml.wrappers.Incremental) and a classifier like scikit-learn's `SGDClassifier` to\n", "create a classification pipeline.\n", "\n", "We'll predict whether the topic was in the `comp` category." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bunch.target_names" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "\n", "positive = np.arange(len(bunch.target_names))[['comp' in x for x in bunch.target_names]]\n", "y = df['target'].isin(positive).astype(int)\n", "y" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import sklearn.linear_model\n", "import sklearn.pipeline\n", "\n", "import dask_ml.wrappers" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Because the input comes from a dask Series, with unknown chunk sizes, we need to specify `assume_equal_chunks=True`. This tells Dask-ML that we know that each partition in `X`\n", "matches a partition in `y`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sgd = sklearn.linear_model.SGDClassifier(\n", " tol=1e-3\n", ")\n", "clf = dask_ml.wrappers.Incremental(\n", " sgd, scoring='accuracy', assume_equal_chunks=True\n", ")\n", "pipe = sklearn.pipeline.make_pipeline(vect, clf)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`SGDClassifier.partial_fit` needs to know the full set of classes up front.\n", "Because our `sgd` is wrapped inside an `Incremental`, we need to pass it through\n", "as the `incremental__classes` keyword argument in `fit`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipe.fit(df['text'], y,\n", " incremental__classes=[0, 1]);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As usual, `Incremental.predict` lazily returns the predictions as a dask Array." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictions = pipe.predict(df['text'])\n", "predictions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can compute the predictions and score in parallel with `dask_ml.metrics.accuracy_score`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dask_ml.metrics.accuracy_score(y, predictions)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This simple combination of a HashingVectorizer and SGDClassifier is\n", "pretty effective at this prediction task." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" } }, "nbformat": 4, "nbformat_minor": 4 }