{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Dask DataFrames\n",
    "\n",
    "Dask DataFrames 协调沿索引排列的许多 Pandas DataFrames/Series。Dask DataFrame 按行分区,按索引值对行进行分组以提高效率。这些 Pandas 对象可能存在于磁盘或其他机器上。\n",
    "\n",
    "<img src=\"./images/dask-dataframe.svg\" alt=\"img\" width=400 />\n",
    "\n",
    "Pandas 非常适合适合内存的表格数据集。当您要分析的数据集大于机器的 RAM 时,Dask 会变得很有用。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<table style=\"border: 2px solid white;\">\n",
       "<tr>\n",
       "<td style=\"vertical-align: top; border: 0px solid white\">\n",
       "<h3 style=\"text-align: left;\">Client</h3>\n",
       "<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n",
       "  <li><b>Scheduler: </b>tcp://127.0.0.1:58039</li>\n",
       "  <li><b>Dashboard: </b><a href='http://127.0.0.1:8787/status' target='_blank'>http://127.0.0.1:8787/status</a></li>\n",
       "</ul>\n",
       "</td>\n",
       "<td style=\"vertical-align: top; border: 0px solid white\">\n",
       "<h3 style=\"text-align: left;\">Cluster</h3>\n",
       "<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n",
       "  <li><b>Workers: </b>2</li>\n",
       "  <li><b>Cores: </b>4</li>\n",
       "  <li><b>Memory: </b>1.86 GiB</li>\n",
       "</ul>\n",
       "</td>\n",
       "</tr>\n",
       "</table>"
      ],
      "text/plain": [
       "<Client: 'tcp://127.0.0.1:58039' processes=2 threads=4, memory=1.86 GiB>"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from dask.distributed import Client, progress\n",
    "\n",
    "client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')\n",
    "client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 创建随机时间序列数据\n",
    "创建具有以下属性的随机数据时间序列:\n",
    "\n",
    "1. 它存储 2000 年每 10 秒的记录\n",
    "\n",
    "2. 它逐月拆分,将每个月保留为单独的 Pandas 数据框\n",
    "\n",
    "3. 除了日期时间索引,它还包含名称、ID 和数值列\n",
    "\n",
    "这是一个大约 240 MB 的小数据集。增加天数或减少练习更大数据集的频率。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask\n",
    "import dask.dataframe as dd\n",
    "df = dask.datasets.timeseries()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div><strong>Dask DataFrame Structure:</strong></div>\n",
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>id</th>\n",
       "      <th>name</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>npartitions=30</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>2000-01-01</th>\n",
       "      <td>int64</td>\n",
       "      <td>object</td>\n",
       "      <td>float64</td>\n",
       "      <td>float64</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-02</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>...</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-30</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2000-01-31</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>\n",
       "<div>Dask Name: make-timeseries, 30 tasks</div>"
      ],
      "text/plain": [
       "Dask DataFrame Structure:\n",
       "                   id    name        x        y\n",
       "npartitions=30                                 \n",
       "2000-01-01      int64  object  float64  float64\n",
       "2000-01-02        ...     ...      ...      ...\n",
       "...               ...     ...      ...      ...\n",
       "2000-01-30        ...     ...      ...      ...\n",
       "2000-01-31        ...     ...      ...      ...\n",
       "Dask Name: make-timeseries, 30 tasks"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Dask DataFrames 是惰性的,因此这里不打印数据\n",
    "df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "id        int64\n",
       "name     object\n",
       "x       float64\n",
       "y       float64\n",
       "dtype: object"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.dtypes"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 使用标准 Pandas 操作"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Dask Series Structure:\n",
       "npartitions=1\n",
       "    float64\n",
       "        ...\n",
       "Name: x, dtype: float64\n",
       "Dask Name: sqrt, 157 tasks"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# 计算方差\n",
    "df2 = df[df.y > 0]\n",
    "df3 = df2.groupby('name').x.std()\n",
    "df3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 185 ms, sys: 19.9 ms, total: 204 ms\n",
      "Wall time: 1.28 s\n"
     ]
    }
   ],
   "source": [
    "%time computed_df = df3.compute()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "name\n",
       "Alice       0.577885\n",
       "Bob         0.576743\n",
       "Charlie     0.579827\n",
       "Dan         0.577208\n",
       "Edith       0.577062\n",
       "Frank       0.576297\n",
       "George      0.575883\n",
       "Hannah      0.576153\n",
       "Ingrid      0.580452\n",
       "Jerry       0.574682\n",
       "Kevin       0.577536\n",
       "Laura       0.578090\n",
       "Michael     0.577965\n",
       "Norbert     0.578714\n",
       "Oliver      0.577328\n",
       "Patricia    0.577173\n",
       "Quinn       0.577750\n",
       "Ray         0.576110\n",
       "Sarah       0.576397\n",
       "Tim         0.577916\n",
       "Ursula      0.576522\n",
       "Victor      0.578701\n",
       "Wendy       0.576590\n",
       "Xavier      0.576980\n",
       "Yvonne      0.576429\n",
       "Zelda       0.576003\n",
       "Name: x, dtype: float64"
      ]
     },
     "execution_count": 13,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "computed_df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 持久化数据到内存中"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = df.persist()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 133 ms, sys: 12 ms, total: 145 ms\n",
      "Wall time: 494 ms\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "name\n",
       "Alice       0.577885\n",
       "Bob         0.576743\n",
       "Charlie     0.579827\n",
       "Dan         0.577208\n",
       "Edith       0.577062\n",
       "Frank       0.576297\n",
       "George      0.575883\n",
       "Hannah      0.576153\n",
       "Ingrid      0.580452\n",
       "Jerry       0.574682\n",
       "Kevin       0.577536\n",
       "Laura       0.578090\n",
       "Michael     0.577965\n",
       "Norbert     0.578714\n",
       "Oliver      0.577328\n",
       "Patricia    0.577173\n",
       "Quinn       0.577750\n",
       "Ray         0.576110\n",
       "Sarah       0.576397\n",
       "Tim         0.577916\n",
       "Ursula      0.576522\n",
       "Victor      0.578701\n",
       "Wendy       0.576590\n",
       "Xavier      0.576980\n",
       "Yvonne      0.576429\n",
       "Zelda       0.576003\n",
       "Name: x, dtype: float64"
      ]
     },
     "execution_count": 15,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df2 = df[df.y > 0]\n",
    "df3 = df2.groupby('name').x.std()\n",
    "%time df3.compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 设置索引\n",
    "\n",
    "数据是按索引列排序。这允许更快的访问、joins、groupby-apply操作。\n",
    "\n",
    "然而,对数据进行并行排序可能代价高昂,因此设置索引很重要,但很少使用。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div><strong>Dask DataFrame Structure:</strong></div>\n",
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>id</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>npartitions=30</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>Alice</th>\n",
       "      <td>int64</td>\n",
       "      <td>float64</td>\n",
       "      <td>float64</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>Alice</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>...</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>Zelda</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>Zelda</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>\n",
       "<div>Dask Name: sort_index, 1140 tasks</div>"
      ],
      "text/plain": [
       "Dask DataFrame Structure:\n",
       "                   id        x        y\n",
       "npartitions=30                         \n",
       "Alice           int64  float64  float64\n",
       "Alice             ...      ...      ...\n",
       "...               ...      ...      ...\n",
       "Zelda             ...      ...      ...\n",
       "Zelda             ...      ...      ...\n",
       "Dask Name: sort_index, 1140 tasks"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# 按照名称重排序\n",
    "df = df.set_index('name')\n",
    "df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = df.persist()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "CPU times: user 39.2 ms, sys: 11.7 ms, total: 51 ms\n",
      "Wall time: 88 ms\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>id</th>\n",
       "      <th>x</th>\n",
       "      <th>y</th>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>name</th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "      <th></th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>Alice</th>\n",
       "      <td>928</td>\n",
       "      <td>-0.523920</td>\n",
       "      <td>0.226035</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>Alice</th>\n",
       "      <td>986</td>\n",
       "      <td>-0.170906</td>\n",
       "      <td>0.354172</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>Alice</th>\n",
       "      <td>931</td>\n",
       "      <td>-0.499495</td>\n",
       "      <td>0.417713</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>Alice</th>\n",
       "      <td>1003</td>\n",
       "      <td>0.828322</td>\n",
       "      <td>-0.492837</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>Alice</th>\n",
       "      <td>987</td>\n",
       "      <td>0.580199</td>\n",
       "      <td>0.944183</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>...</th>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "      <td>...</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>Alice</th>\n",
       "      <td>1036</td>\n",
       "      <td>-0.247774</td>\n",
       "      <td>-0.955866</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>Alice</th>\n",
       "      <td>1028</td>\n",
       "      <td>-0.280042</td>\n",
       "      <td>-0.701543</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>Alice</th>\n",
       "      <td>1037</td>\n",
       "      <td>0.361707</td>\n",
       "      <td>0.656865</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>Alice</th>\n",
       "      <td>970</td>\n",
       "      <td>0.702504</td>\n",
       "      <td>-0.732269</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>Alice</th>\n",
       "      <td>977</td>\n",
       "      <td>-0.636661</td>\n",
       "      <td>0.062147</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "<p>100244 rows × 3 columns</p>\n",
       "</div>"
      ],
      "text/plain": [
       "         id         x         y\n",
       "name                           \n",
       "Alice   928 -0.523920  0.226035\n",
       "Alice   986 -0.170906  0.354172\n",
       "Alice   931 -0.499495  0.417713\n",
       "Alice  1003  0.828322 -0.492837\n",
       "Alice   987  0.580199  0.944183\n",
       "...     ...       ...       ...\n",
       "Alice  1036 -0.247774 -0.955866\n",
       "Alice  1028 -0.280042 -0.701543\n",
       "Alice  1037  0.361707  0.656865\n",
       "Alice   970  0.702504 -0.732269\n",
       "Alice   977 -0.636661  0.062147\n",
       "\n",
       "[100244 rows x 3 columns]"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# 此时针对name字段的值进行索引就很快\n",
    "\n",
    "%time df.loc['Alice'].compute()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "client.cancel(df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client\n",
      "_GatheringFuture exception was never retrieved\n",
      "future: <_GatheringFuture finished exception=CancelledError()>\n",
      "asyncio.exceptions.CancelledError\n"
     ]
    }
   ],
   "source": [
    "client.shutdown()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}