{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Dask DataFrames\n", "\n", "Dask DataFrames 协调沿索引排列的许多 Pandas DataFrames/Series。Dask DataFrame 按行分区,按索引值对行进行分组以提高效率。这些 Pandas 对象可能存在于磁盘或其他机器上。\n", "\n", "<img src=\"./images/dask-dataframe.svg\" alt=\"img\" width=400 />\n", "\n", "Pandas 非常适合适合内存的表格数据集。当您要分析的数据集大于机器的 RAM 时,Dask 会变得很有用。" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<table style=\"border: 2px solid white;\">\n", "<tr>\n", "<td style=\"vertical-align: top; border: 0px solid white\">\n", "<h3 style=\"text-align: left;\">Client</h3>\n", "<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n", " <li><b>Scheduler: </b>tcp://127.0.0.1:58039</li>\n", " <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a></li>\n", "</ul>\n", "</td>\n", "<td style=\"vertical-align: top; border: 0px solid white\">\n", "<h3 style=\"text-align: left;\">Cluster</h3>\n", "<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n", " <li><b>Workers: </b>2</li>\n", " <li><b>Cores: </b>4</li>\n", " <li><b>Memory: </b>1.86 GiB</li>\n", "</ul>\n", "</td>\n", "</tr>\n", "</table>" ], "text/plain": [ "<Client: 'tcp://127.0.0.1:58039' processes=2 threads=4, memory=1.86 GiB>" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Client, progress\n", "\n", "client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 创建随机时间序列数据\n", "创建具有以下属性的随机数据时间序列:\n", "\n", "1. 它存储 2000 年每 10 秒的记录\n", "\n", "2. 它逐月拆分,将每个月保留为单独的 Pandas 数据框\n", "\n", "3. 除了日期时间索引,它还包含名称、ID 和数值列\n", "\n", "这是一个大约 240 MB 的小数据集。增加天数或减少练习更大数据集的频率。" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "import dask\n", "import dask.dataframe as dd\n", "df = dask.datasets.timeseries()" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div><strong>Dask DataFrame Structure:</strong></div>\n", "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>id</th>\n", " <th>name</th>\n", " <th>x</th>\n", " <th>y</th>\n", " </tr>\n", " <tr>\n", " <th>npartitions=30</th>\n", " <th></th>\n", " <th></th>\n", " <th></th>\n", " <th></th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>2000-01-01</th>\n", " <td>int64</td>\n", " <td>object</td>\n", " <td>float64</td>\n", " <td>float64</td>\n", " </tr>\n", " <tr>\n", " <th>2000-01-02</th>\n", " <td>...</td>\n", " <td>...</td>\n", " <td>...</td>\n", " <td>...</td>\n", " </tr>\n", " <tr>\n", " <th>...</th>\n", " <td>...</td>\n", " <td>...</td>\n", " <td>...</td>\n", " <td>...</td>\n", " </tr>\n", " <tr>\n", " <th>2000-01-30</th>\n", " <td>...</td>\n", " <td>...</td>\n", " <td>...</td>\n", " <td>...</td>\n", " </tr>\n", " <tr>\n", " <th>2000-01-31</th>\n", " <td>...</td>\n", " <td>...</td>\n", " <td>...</td>\n", " <td>...</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>\n", "<div>Dask Name: make-timeseries, 30 tasks</div>" ], "text/plain": [ "Dask DataFrame Structure:\n", " id name x y\n", "npartitions=30 \n", "2000-01-01 int64 object float64 float64\n", "2000-01-02 ... ... ... ...\n", "... ... ... ... ...\n", "2000-01-30 ... ... ... ...\n", "2000-01-31 ... ... ... ...\n", "Dask Name: make-timeseries, 30 tasks" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Dask DataFrames 是惰性的,因此这里不打印数据\n", "df" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "id int64\n", "name object\n", "x float64\n", "y float64\n", "dtype: object" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.dtypes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 使用标准 Pandas 操作" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Dask Series Structure:\n", "npartitions=1\n", " float64\n", " ...\n", "Name: x, dtype: float64\n", "Dask Name: sqrt, 157 tasks" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 计算方差\n", "df2 = df[df.y > 0]\n", "df3 = df2.groupby('name').x.std()\n", "df3" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 185 ms, sys: 19.9 ms, total: 204 ms\n", "Wall time: 1.28 s\n" ] } ], "source": [ "%time computed_df = df3.compute()" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "name\n", "Alice 0.577885\n", "Bob 0.576743\n", "Charlie 0.579827\n", "Dan 0.577208\n", "Edith 0.577062\n", "Frank 0.576297\n", "George 0.575883\n", "Hannah 0.576153\n", "Ingrid 0.580452\n", "Jerry 0.574682\n", "Kevin 0.577536\n", "Laura 0.578090\n", "Michael 0.577965\n", "Norbert 0.578714\n", "Oliver 0.577328\n", "Patricia 0.577173\n", "Quinn 0.577750\n", "Ray 0.576110\n", "Sarah 0.576397\n", "Tim 0.577916\n", "Ursula 0.576522\n", "Victor 0.578701\n", "Wendy 0.576590\n", "Xavier 0.576980\n", "Yvonne 0.576429\n", "Zelda 0.576003\n", "Name: x, dtype: float64" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "computed_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 持久化数据到内存中" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "df = df.persist()" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 133 ms, sys: 12 ms, total: 145 ms\n", "Wall time: 494 ms\n" ] }, { "data": { "text/plain": [ "name\n", "Alice 0.577885\n", "Bob 0.576743\n", "Charlie 0.579827\n", "Dan 0.577208\n", "Edith 0.577062\n", "Frank 0.576297\n", "George 0.575883\n", "Hannah 0.576153\n", "Ingrid 0.580452\n", "Jerry 0.574682\n", "Kevin 0.577536\n", "Laura 0.578090\n", "Michael 0.577965\n", "Norbert 0.578714\n", "Oliver 0.577328\n", "Patricia 0.577173\n", "Quinn 0.577750\n", "Ray 0.576110\n", "Sarah 0.576397\n", "Tim 0.577916\n", "Ursula 0.576522\n", "Victor 0.578701\n", "Wendy 0.576590\n", "Xavier 0.576980\n", "Yvonne 0.576429\n", "Zelda 0.576003\n", "Name: x, dtype: float64" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df2 = df[df.y > 0]\n", "df3 = df2.groupby('name').x.std()\n", "%time df3.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 设置索引\n", "\n", "数据是按索引列排序。这允许更快的访问、joins、groupby-apply操作。\n", "\n", "然而,对数据进行并行排序可能代价高昂,因此设置索引很重要,但很少使用。" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div><strong>Dask DataFrame Structure:</strong></div>\n", "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>id</th>\n", " <th>x</th>\n", " <th>y</th>\n", " </tr>\n", " <tr>\n", " <th>npartitions=30</th>\n", " <th></th>\n", " <th></th>\n", " <th></th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>Alice</th>\n", " <td>int64</td>\n", " <td>float64</td>\n", " <td>float64</td>\n", " </tr>\n", " <tr>\n", " <th>Alice</th>\n", " <td>...</td>\n", " <td>...</td>\n", " <td>...</td>\n", " </tr>\n", " <tr>\n", " <th>...</th>\n", " <td>...</td>\n", " <td>...</td>\n", " <td>...</td>\n", " </tr>\n", " <tr>\n", " <th>Zelda</th>\n", " <td>...</td>\n", " <td>...</td>\n", " <td>...</td>\n", " </tr>\n", " <tr>\n", " <th>Zelda</th>\n", " <td>...</td>\n", " <td>...</td>\n", " <td>...</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>\n", "<div>Dask Name: sort_index, 1140 tasks</div>" ], "text/plain": [ "Dask DataFrame Structure:\n", " id x y\n", "npartitions=30 \n", "Alice int64 float64 float64\n", "Alice ... ... ...\n", "... ... ... ...\n", "Zelda ... ... ...\n", "Zelda ... ... ...\n", "Dask Name: sort_index, 1140 tasks" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 按照名称重排序\n", "df = df.set_index('name')\n", "df" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "df = df.persist()" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 39.2 ms, sys: 11.7 ms, total: 51 ms\n", "Wall time: 88 ms\n" ] }, { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>id</th>\n", " <th>x</th>\n", " <th>y</th>\n", " </tr>\n", " <tr>\n", " <th>name</th>\n", " <th></th>\n", " <th></th>\n", " <th></th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>Alice</th>\n", " <td>928</td>\n", " <td>-0.523920</td>\n", " <td>0.226035</td>\n", " </tr>\n", " <tr>\n", " <th>Alice</th>\n", " <td>986</td>\n", " <td>-0.170906</td>\n", " <td>0.354172</td>\n", " </tr>\n", " <tr>\n", " <th>Alice</th>\n", " <td>931</td>\n", " <td>-0.499495</td>\n", " <td>0.417713</td>\n", " </tr>\n", " <tr>\n", " <th>Alice</th>\n", " <td>1003</td>\n", " <td>0.828322</td>\n", " <td>-0.492837</td>\n", " </tr>\n", " <tr>\n", " <th>Alice</th>\n", " <td>987</td>\n", " <td>0.580199</td>\n", " <td>0.944183</td>\n", " </tr>\n", " <tr>\n", " <th>...</th>\n", " <td>...</td>\n", " <td>...</td>\n", " <td>...</td>\n", " </tr>\n", " <tr>\n", " <th>Alice</th>\n", " <td>1036</td>\n", " <td>-0.247774</td>\n", " <td>-0.955866</td>\n", " </tr>\n", " <tr>\n", " <th>Alice</th>\n", " <td>1028</td>\n", " <td>-0.280042</td>\n", " <td>-0.701543</td>\n", " </tr>\n", " <tr>\n", " <th>Alice</th>\n", " <td>1037</td>\n", " <td>0.361707</td>\n", " <td>0.656865</td>\n", " </tr>\n", " <tr>\n", " <th>Alice</th>\n", " <td>970</td>\n", " <td>0.702504</td>\n", " <td>-0.732269</td>\n", " </tr>\n", " <tr>\n", " <th>Alice</th>\n", " <td>977</td>\n", " <td>-0.636661</td>\n", " <td>0.062147</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "<p>100244 rows × 3 columns</p>\n", "</div>" ], "text/plain": [ " id x y\n", "name \n", "Alice 928 -0.523920 0.226035\n", "Alice 986 -0.170906 0.354172\n", "Alice 931 -0.499495 0.417713\n", "Alice 1003 0.828322 -0.492837\n", "Alice 987 0.580199 0.944183\n", "... ... ... ...\n", "Alice 1036 -0.247774 -0.955866\n", "Alice 1028 -0.280042 -0.701543\n", "Alice 1037 0.361707 0.656865\n", "Alice 970 0.702504 -0.732269\n", "Alice 977 -0.636661 0.062147\n", "\n", "[100244 rows x 3 columns]" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 此时针对name字段的值进行索引就很快\n", "\n", "%time df.loc['Alice'].compute()" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "client.cancel(df)" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client\n", "_GatheringFuture exception was never retrieved\n", "future: <_GatheringFuture finished exception=CancelledError()>\n", "asyncio.exceptions.CancelledError\n" ] } ], "source": [ "client.shutdown()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.8" } }, "nbformat": 4, "nbformat_minor": 4 }