{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Dask DataFrames\n", "\n", "Dask DataFrames 协调沿索引排列的许多 Pandas DataFrames/Series。Dask DataFrame 按行分区,按索引值对行进行分组以提高效率。这些 Pandas 对象可能存在于磁盘或其他机器上。\n", "\n", "\"img\"\n", "\n", "Pandas 非常适合适合内存的表格数据集。当您要分析的数据集大于机器的 RAM 时,Dask 会变得很有用。" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "
\n", "

Client

\n", "\n", "
\n", "

Cluster

\n", "
    \n", "
  • Workers: 2
  • \n", "
  • Cores: 4
  • \n", "
  • Memory: 1.86 GiB
  • \n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 1, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from dask.distributed import Client, progress\n", "\n", "client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 创建随机时间序列数据\n", "创建具有以下属性的随机数据时间序列:\n", "\n", "1. 它存储 2000 年每 10 秒的记录\n", "\n", "2. 它逐月拆分,将每个月保留为单独的 Pandas 数据框\n", "\n", "3. 除了日期时间索引,它还包含名称、ID 和数值列\n", "\n", "这是一个大约 240 MB 的小数据集。增加天数或减少练习更大数据集的频率。" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "import dask\n", "import dask.dataframe as dd\n", "df = dask.datasets.timeseries()" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idnamexy
npartitions=30
2000-01-01int64objectfloat64float64
2000-01-02............
...............
2000-01-30............
2000-01-31............
\n", "
\n", "
Dask Name: make-timeseries, 30 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " id name x y\n", "npartitions=30 \n", "2000-01-01 int64 object float64 float64\n", "2000-01-02 ... ... ... ...\n", "... ... ... ... ...\n", "2000-01-30 ... ... ... ...\n", "2000-01-31 ... ... ... ...\n", "Dask Name: make-timeseries, 30 tasks" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Dask DataFrames 是惰性的,因此这里不打印数据\n", "df" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "id int64\n", "name object\n", "x float64\n", "y float64\n", "dtype: object" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df.dtypes" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 使用标准 Pandas 操作" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Dask Series Structure:\n", "npartitions=1\n", " float64\n", " ...\n", "Name: x, dtype: float64\n", "Dask Name: sqrt, 157 tasks" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 计算方差\n", "df2 = df[df.y > 0]\n", "df3 = df2.groupby('name').x.std()\n", "df3" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 185 ms, sys: 19.9 ms, total: 204 ms\n", "Wall time: 1.28 s\n" ] } ], "source": [ "%time computed_df = df3.compute()" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "name\n", "Alice 0.577885\n", "Bob 0.576743\n", "Charlie 0.579827\n", "Dan 0.577208\n", "Edith 0.577062\n", "Frank 0.576297\n", "George 0.575883\n", "Hannah 0.576153\n", "Ingrid 0.580452\n", "Jerry 0.574682\n", "Kevin 0.577536\n", "Laura 0.578090\n", "Michael 0.577965\n", "Norbert 0.578714\n", "Oliver 0.577328\n", "Patricia 0.577173\n", "Quinn 0.577750\n", "Ray 0.576110\n", "Sarah 0.576397\n", "Tim 0.577916\n", "Ursula 0.576522\n", "Victor 0.578701\n", "Wendy 0.576590\n", "Xavier 0.576980\n", "Yvonne 0.576429\n", "Zelda 0.576003\n", "Name: x, dtype: float64" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "computed_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 持久化数据到内存中" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "df = df.persist()" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 133 ms, sys: 12 ms, total: 145 ms\n", "Wall time: 494 ms\n" ] }, { "data": { "text/plain": [ "name\n", "Alice 0.577885\n", "Bob 0.576743\n", "Charlie 0.579827\n", "Dan 0.577208\n", "Edith 0.577062\n", "Frank 0.576297\n", "George 0.575883\n", "Hannah 0.576153\n", "Ingrid 0.580452\n", "Jerry 0.574682\n", "Kevin 0.577536\n", "Laura 0.578090\n", "Michael 0.577965\n", "Norbert 0.578714\n", "Oliver 0.577328\n", "Patricia 0.577173\n", "Quinn 0.577750\n", "Ray 0.576110\n", "Sarah 0.576397\n", "Tim 0.577916\n", "Ursula 0.576522\n", "Victor 0.578701\n", "Wendy 0.576590\n", "Xavier 0.576980\n", "Yvonne 0.576429\n", "Zelda 0.576003\n", "Name: x, dtype: float64" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df2 = df[df.y > 0]\n", "df3 = df2.groupby('name').x.std()\n", "%time df3.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 设置索引\n", "\n", "数据是按索引列排序。这允许更快的访问、joins、groupby-apply操作。\n", "\n", "然而,对数据进行并行排序可能代价高昂,因此设置索引很重要,但很少使用。" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
Dask DataFrame Structure:
\n", "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idxy
npartitions=30
Aliceint64float64float64
Alice.........
............
Zelda.........
Zelda.........
\n", "
\n", "
Dask Name: sort_index, 1140 tasks
" ], "text/plain": [ "Dask DataFrame Structure:\n", " id x y\n", "npartitions=30 \n", "Alice int64 float64 float64\n", "Alice ... ... ...\n", "... ... ... ...\n", "Zelda ... ... ...\n", "Zelda ... ... ...\n", "Dask Name: sort_index, 1140 tasks" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 按照名称重排序\n", "df = df.set_index('name')\n", "df" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "df = df.persist()" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "CPU times: user 39.2 ms, sys: 11.7 ms, total: 51 ms\n", "Wall time: 88 ms\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
idxy
name
Alice928-0.5239200.226035
Alice986-0.1709060.354172
Alice931-0.4994950.417713
Alice10030.828322-0.492837
Alice9870.5801990.944183
............
Alice1036-0.247774-0.955866
Alice1028-0.280042-0.701543
Alice10370.3617070.656865
Alice9700.702504-0.732269
Alice977-0.6366610.062147
\n", "

100244 rows × 3 columns

\n", "
" ], "text/plain": [ " id x y\n", "name \n", "Alice 928 -0.523920 0.226035\n", "Alice 986 -0.170906 0.354172\n", "Alice 931 -0.499495 0.417713\n", "Alice 1003 0.828322 -0.492837\n", "Alice 987 0.580199 0.944183\n", "... ... ... ...\n", "Alice 1036 -0.247774 -0.955866\n", "Alice 1028 -0.280042 -0.701543\n", "Alice 1037 0.361707 0.656865\n", "Alice 970 0.702504 -0.732269\n", "Alice 977 -0.636661 0.062147\n", "\n", "[100244 rows x 3 columns]" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 此时针对name字段的值进行索引就很快\n", "\n", "%time df.loc['Alice'].compute()" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "client.cancel(df)" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client\n", "_GatheringFuture exception was never retrieved\n", "future: <_GatheringFuture finished exception=CancelledError()>\n", "asyncio.exceptions.CancelledError\n" ] } ], "source": [ "client.shutdown()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.8" } }, "nbformat": 4, "nbformat_minor": 4 }