{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Dask DataFrames\n", "\n", "\"Dask\n", " \n", "Dask Dataframes coordinate many Pandas dataframes, partitioned along an index. They support a large subset of the Pandas API." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Start Dask Client for Dashboard\n", "\n", "Starting the Dask Client is optional. It will provide a dashboard which \n", "is useful to gain insight on the computation. \n", "\n", "The link to the dashboard will become visible when you create the client below. We recommend having it open on one side of your screen while using your notebook on the other side. This can take some effort to arrange your windows, but seeing them both at the same is very useful when learning." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from dask.distributed import Client\n", "\n", "client = Client(n_workers=2, threads_per_worker=2, memory_limit=\"1GB\")\n", "client\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create Random Dataframe\n", "\n", "We create a random timeseries of data with the following attributes:\n", "\n", "1. It stores a record for every second in the month of January of the year 2000\n", "2. It splits that month by day, keeping each day as a partitioned dataframe\n", "3. Along with a datetime index it has columns for names, ids, and numeric values\n", "\n", "This is a small dataset of about 240 MB. Increase the number of days or reduce the time interval between data points to practice with a larger dataset by setting some of the [`dask.datasets.timeseries()` arguments](https://docs.dask.org/en/stable/api.html#dask.datasets.timeseries)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import dask\n", "\n", "df = dask.datasets.timeseries()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Unlike Pandas, Dask DataFrames are _lazy_, meaning that data is only loaded when it is needed for a computation. No data is printed here, instead it is replaced by ellipses (`...`)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Nonetheless, the column names and dtypes are known." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.dtypes\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Some operations will automatically display the data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# This sets some formatting parameters for displayed data.\n", "import pandas as pd\n", "\n", "pd.options.display.precision = 2\n", "pd.options.display.max_rows = 10\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.head(3)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Use Standard Pandas Operations\n", "\n", "Most common Pandas operations can be used in the same way on Dask dataframes. This example shows how to slice the data based on a mask condition and then determine the standard deviation of the data in the `x` column." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df2 = df[df.y > 0]\n", "df3 = df2.groupby(\"name\").x.std()\n", "df3\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice that the data in `df3` are still represented by ellipses. All of the operations in the previous cell are lazy operations. You can call `.compute()` when you want your result as a Pandas dataframe or series.\n", "\n", "If you started `Client()` above then you can watch the status page during computation to see the progress." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "computed_df = df3.compute()\n", "type(computed_df)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "computed_df\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice that the computed data are now shown in the output.\n", "\n", "Another example calculation is to aggregate multiple columns, as shown below. Once again, the dashboard will show the progress of the computation." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df4 = df.groupby(\"name\").aggregate({\"x\": \"sum\", \"y\": \"max\"})\n", "df4.compute()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dask dataframes can also be joined like Pandas dataframes. In this example we join the aggregated data in `df4` with the original data in `df`. Since the index in `df` is the timeseries and `df4` is indexed by names, we use `left_on=\"name\"` and `right_index=True` to define the merge columns. We also set suffixes for any columns that are common between the two dataframes so that we can distinguish them.\n", "\n", "Finally, since `df4` is small, we also make sure that it is a single partition dataframe." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df4 = df4.repartition(npartitions=1)\n", "joined = df.merge(\n", " df4, left_on=\"name\", right_index=True, suffixes=(\"_original\", \"_aggregated\")\n", ")\n", "joined.head()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Persist data in memory\n", "\n", "If you have the available RAM for your dataset then you can persist data in memory. \n", "\n", "This allows future computations to be much faster." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = df.persist()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Time Series Operations\n", "\n", "Because `df` has a datetime index, time-series operations work efficiently.\n", "\n", "The first example below resamples the data at 1 hour intervals to reduce the total size of the dataframe. Then the mean of the `x` and `y` columns are taken." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df[[\"x\", \"y\"]].resample(\"1h\").mean().head()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The next example resamples the data at 24 hour intervals and plots the mean values. Notice that `plot()` is called after `compute()` because `plot()` will not work until the data are computed." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "df[['x', 'y']].resample('24h').mean().compute().plot();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This final example computes the rolling 24 hour mean of the data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df[[\"x\", \"y\"]].rolling(window=\"24h\").mean().head()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Random access is cheap along the index, but must since the Dask dataframe is lazy, it must be computed to materialize the data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df.loc[\"2000-01-05\"]\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%time df.loc['2000-01-05'].compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set Index\n", "\n", "Data is sorted by the index column. This allows for faster access, joins, groupby-apply operations, and more. However sorting data can be costly to do in parallel, so setting the index is both important to do, but only infrequently. In the next few examples, we will group the data by the `name` column, so we will set that column as the index to improve efficiency." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df5 = df.set_index(\"name\")\n", "df5\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Because resetting the index for this dataset is expensive and we can fit it in our available RAM, we persist the dataset to memory." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df5 = df5.persist()\n", "df5\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Dask now knows where all data lives, indexed by name. As a result operations like random access are cheap and efficient." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%time df5.loc['Alice'].compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Groupby Apply with Scikit-Learn" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that our data is sorted by name we can inexpensively do operations like random access on name, or groupby-apply with custom functions.\n", "\n", "Here we train a different scikit-learn linear regression model on each name." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.linear_model import LinearRegression\n", "\n", "\n", "def train(partition):\n", " if not len(partition):\n", " return\n", " est = LinearRegression()\n", " est.fit(partition[[\"x\"]].values, partition.y.values)\n", " return est\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `partition` argument to `train()` will be one of the group instances from the `DataFrameGroupBy`. If there is no data in the partition, we don't need to proceed. If there is data, we want to fit the linear regression model and return that as the value for this group.\n", "\n", "Now working with `df5`, whose index is the names from `df`, we can group by the `names` column. This also happens to be the index, but that's fine. Then we use `.apply()` to run `train()` on each group in the `DataFrameGroupBy` generated by `.groupby()`.\n", "\n", "The `meta` argument tells Dask how to create the `DataFrame` or `Series` that will hold the result of `.apply()`. In this case, `train()` returns a single value, so `.apply()` will create a `Series`. This means we need to tell Dask what the type of that single column should be and optionally give it a name.\n", "\n", "The easiest way to specify a single column is with a tuple. The name of the column is the first element of the tuple. Since this is a series of linear regressions, we will name the column `\"LinearRegression\"`. The second element of the tuple is the type of the return value of `train`. In this case, Pandas will store the result as a general `object`, which should be the type we pass." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df6 = df5.groupby(\"name\").apply(\n", " train, meta=(\"LinearRegression\", object)\n", ").compute()\n", "df6" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Further Reading\n", "\n", "For a more in-depth introduction to Dask dataframes, see the [dask tutorial](https://github.com/dask/dask-tutorial), notebooks 04 and 07." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" } }, "nbformat": 4, "nbformat_minor": 4 }