{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Dask Bag\n",
    "\n",
    "Dask-bag 擅长处理可以表示为任意输入序列的数据。我们将其称为“杂乱”数据,因为它可能包含复杂的嵌套结构、缺失的字段、数据类型的混合等。\n",
    "\n",
    "默认情况下,dask.bag使用dask.multiprocessing的计算。作为一个好处,Dask 绕过GIL并在纯 Python 对象上使用多个内核。作为一个缺点,Dask Bag 在包含大量工作间通信的计算中表现不佳。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import Client, progress\n",
    "client = Client(n_workers=4, threads_per_worker=1)\n",
    "client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 创建随机数据\n",
    "\n",
    "创建一组随机的记录数据,并将其以JSON形式存储到磁盘。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['/Users/kefei/demo/dask-learn/data/0.json',\n",
       " '/Users/kefei/demo/dask-learn/data/1.json',\n",
       " '/Users/kefei/demo/dask-learn/data/2.json',\n",
       " '/Users/kefei/demo/dask-learn/data/3.json',\n",
       " '/Users/kefei/demo/dask-learn/data/4.json',\n",
       " '/Users/kefei/demo/dask-learn/data/5.json',\n",
       " '/Users/kefei/demo/dask-learn/data/6.json',\n",
       " '/Users/kefei/demo/dask-learn/data/7.json',\n",
       " '/Users/kefei/demo/dask-learn/data/8.json',\n",
       " '/Users/kefei/demo/dask-learn/data/9.json']"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import dask\n",
    "import json\n",
    "import os\n",
    "\n",
    "os.makedirs('data', exist_ok=True)              \n",
    "\n",
    "b = dask.datasets.make_people()                \n",
    "b.map(json.dumps).to_textfiles('data/*.json')  "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 读取JSON数据"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{\"age\": 51, \"name\": [\"Chas\", \"Ratliff\"], \"occupation\": \"Seaman\", \"telephone\": \"344-219-9130\", \"address\": {\"address\": \"1135 Hawthorne Concession road\", \"city\": \"Springfield\"}, \"credit-card\": {\"number\": \"3416 044674 28287\", \"expiration-date\": \"06/20\"}}\n",
      "{\"age\": 53, \"name\": [\"Wilbur\", \"Cannon\"], \"occupation\": \"Geophysicist\", \"telephone\": \"(219) 049-8427\", \"address\": {\"address\": \"871 Bernal Heights Lane\", \"city\": \"Hallandale Beach\"}, \"credit-card\": {\"number\": \"4315 9929 2231 3721\", \"expiration-date\": \"04/22\"}}\n"
     ]
    }
   ],
   "source": [
    "!head -n 2 data/0.json"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "dask.bag<loads, npartitions=20>"
      ]
     },
     "execution_count": 10,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import dask.bag as db\n",
    "import json\n",
    "\n",
    "b = db.read_text('data/*.json').map(json.loads)\n",
    "b"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "({'age': 51,\n",
       "  'name': ['Chas', 'Ratliff'],\n",
       "  'occupation': 'Seaman',\n",
       "  'telephone': '344-219-9130',\n",
       "  'address': {'address': '1135 Hawthorne Concession road',\n",
       "   'city': 'Springfield'},\n",
       "  'credit-card': {'number': '3416 044674 28287', 'expiration-date': '06/20'}},\n",
       " {'age': 53,\n",
       "  'name': ['Wilbur', 'Cannon'],\n",
       "  'occupation': 'Geophysicist',\n",
       "  'telephone': '(219) 049-8427',\n",
       "  'address': {'address': '871 Bernal Heights Lane',\n",
       "   'city': 'Hallandale Beach'},\n",
       "  'credit-card': {'number': '4315 9929 2231 3721',\n",
       "   'expiration-date': '04/22'}})"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# 查看前两个数据\n",
    "b.take(2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Map, Filter, Aggregate"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "({'age': 51,\n",
       "  'name': ['Chas', 'Ratliff'],\n",
       "  'occupation': 'Seaman',\n",
       "  'telephone': '344-219-9130',\n",
       "  'address': {'address': '1135 Hawthorne Concession road',\n",
       "   'city': 'Springfield'},\n",
       "  'credit-card': {'number': '3416 044674 28287', 'expiration-date': '06/20'}},\n",
       " {'age': 53,\n",
       "  'name': ['Wilbur', 'Cannon'],\n",
       "  'occupation': 'Geophysicist',\n",
       "  'telephone': '(219) 049-8427',\n",
       "  'address': {'address': '871 Bernal Heights Lane',\n",
       "   'city': 'Hallandale Beach'},\n",
       "  'credit-card': {'number': '4315 9929 2231 3721',\n",
       "   'expiration-date': '04/22'}})"
      ]
     },
     "execution_count": 12,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# 查看年龄超过30的人\n",
    "b.filter(lambda record: record['age'] > 30).take(2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "('Seaman', 'Geophysicist')"
      ]
     },
     "execution_count": 13,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# 查看每个记录中的occupation值\n",
    "b.map(lambda record: record['occupation']).take(2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "17025"
      ]
     },
     "execution_count": 14,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# 统计数量\n",
    "b.count().compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 链式计算"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "dask.bag<topk-aggregate, npartitions=1>"
      ]
     },
     "execution_count": 16,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# 计算年龄大于30的按occupation排序的前10位\n",
    "result = (b.filter(lambda record: record['age'] > 30)\n",
    "           .map(lambda record: record['occupation'])\n",
    "           .frequencies(sort=True)\n",
    "           .topk(10, key=1))\n",
    "result"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('Stonemason', 25),\n",
       " ('Nursing Manager', 24),\n",
       " ('Theatrical Agent', 23),\n",
       " ('Horticulturalist', 23),\n",
       " ('Valve Technician', 23),\n",
       " ('Landworker', 22),\n",
       " ('Mortician', 22),\n",
       " ('Blind Fitter', 21),\n",
       " ('Care Assistant', 21),\n",
       " ('Seamstress', 21)]"
      ]
     },
     "execution_count": 17,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "result.compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 数据转换+存储"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['/Users/kefei/demo/dask-learn/data/processed.00.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.01.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.02.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.03.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.04.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.05.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.06.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.07.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.08.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.09.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.10.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.11.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.12.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.13.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.14.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.15.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.16.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.17.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.18.json',\n",
       " '/Users/kefei/demo/dask-learn/data/processed.19.json']"
      ]
     },
     "execution_count": 18,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "(b.filter(lambda record: record['age'] > 30) \n",
    "  .map(json.dumps)                           \n",
    "  .to_textfiles('data/processed.*.json'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 预处理后转换为DataFramesb.take(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "({'age': 51,\n",
       "  'name': ['Chas', 'Ratliff'],\n",
       "  'occupation': 'Seaman',\n",
       "  'telephone': '344-219-9130',\n",
       "  'address': {'address': '1135 Hawthorne Concession road',\n",
       "   'city': 'Springfield'},\n",
       "  'credit-card': {'number': '3416 044674 28287', 'expiration-date': '06/20'}},)"
      ]
     },
     "execution_count": 19,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "b.take(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "({'age': 51,\n",
       "  'occupation': 'Seaman',\n",
       "  'telephone': '344-219-9130',\n",
       "  'credit-card-number': '3416 044674 28287',\n",
       "  'credit-card-expiration': '06/20',\n",
       "  'name': 'Chas Ratliff',\n",
       "  'street-address': '1135 Hawthorne Concession road',\n",
       "  'city': 'Springfield'},)"
      ]
     },
     "execution_count": 20,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "def flatten(record):\n",
    "    return {\n",
    "        'age': record['age'],\n",
    "        'occupation': record['occupation'],\n",
    "        'telephone': record['telephone'],\n",
    "        'credit-card-number': record['credit-card']['number'],\n",
    "        'credit-card-expiration': record['credit-card']['expiration-date'],\n",
    "        'name': ' '.join(record['name']),\n",
    "        'street-address': record['address']['address'],\n",
    "        'city': record['address']['city']\n",
    "    }\n",
    "\n",
    "b.map(flatten).take(1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<style scoped>\n",
       "    .dataframe tbody tr th:only-of-type {\n",
       "        vertical-align: middle;\n",
       "    }\n",
       "\n",
       "    .dataframe tbody tr th {\n",
       "        vertical-align: top;\n",
       "    }\n",
       "\n",
       "    .dataframe thead th {\n",
       "        text-align: right;\n",
       "    }\n",
       "</style>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>age</th>\n",
       "      <th>occupation</th>\n",
       "      <th>telephone</th>\n",
       "      <th>credit-card-number</th>\n",
       "      <th>credit-card-expiration</th>\n",
       "      <th>name</th>\n",
       "      <th>street-address</th>\n",
       "      <th>city</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>0</th>\n",
       "      <td>51</td>\n",
       "      <td>Seaman</td>\n",
       "      <td>344-219-9130</td>\n",
       "      <td>3416 044674 28287</td>\n",
       "      <td>06/20</td>\n",
       "      <td>Chas Ratliff</td>\n",
       "      <td>1135 Hawthorne Concession road</td>\n",
       "      <td>Springfield</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>1</th>\n",
       "      <td>53</td>\n",
       "      <td>Geophysicist</td>\n",
       "      <td>(219) 049-8427</td>\n",
       "      <td>4315 9929 2231 3721</td>\n",
       "      <td>04/22</td>\n",
       "      <td>Wilbur Cannon</td>\n",
       "      <td>871 Bernal Heights Lane</td>\n",
       "      <td>Hallandale Beach</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>2</th>\n",
       "      <td>24</td>\n",
       "      <td>Hospital Worker</td>\n",
       "      <td>1-899-405-6481</td>\n",
       "      <td>4494 7877 3692 6723</td>\n",
       "      <td>03/22</td>\n",
       "      <td>Spring Case</td>\n",
       "      <td>820 St. Paul Street-Calvert Hill</td>\n",
       "      <td>Chesapeake</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>3</th>\n",
       "      <td>60</td>\n",
       "      <td>Osteopath</td>\n",
       "      <td>1-677-679-4179</td>\n",
       "      <td>4848 0637 1092 6516</td>\n",
       "      <td>12/25</td>\n",
       "      <td>Lou Galloway</td>\n",
       "      <td>1076 Ellis Viaduct</td>\n",
       "      <td>Kennesaw</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>4</th>\n",
       "      <td>37</td>\n",
       "      <td>Art Historian</td>\n",
       "      <td>514-516-9013</td>\n",
       "      <td>3455 426227 43867</td>\n",
       "      <td>06/17</td>\n",
       "      <td>Thaddeus Kirk</td>\n",
       "      <td>576 Candyland Nene</td>\n",
       "      <td>Peabody</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "   age       occupation       telephone   credit-card-number  \\\n",
       "0   51           Seaman    344-219-9130    3416 044674 28287   \n",
       "1   53     Geophysicist  (219) 049-8427  4315 9929 2231 3721   \n",
       "2   24  Hospital Worker  1-899-405-6481  4494 7877 3692 6723   \n",
       "3   60        Osteopath  1-677-679-4179  4848 0637 1092 6516   \n",
       "4   37    Art Historian    514-516-9013    3455 426227 43867   \n",
       "\n",
       "  credit-card-expiration           name                    street-address  \\\n",
       "0                  06/20   Chas Ratliff    1135 Hawthorne Concession road   \n",
       "1                  04/22  Wilbur Cannon           871 Bernal Heights Lane   \n",
       "2                  03/22    Spring Case  820 St. Paul Street-Calvert Hill   \n",
       "3                  12/25   Lou Galloway                1076 Ellis Viaduct   \n",
       "4                  06/17  Thaddeus Kirk                576 Candyland Nene   \n",
       "\n",
       "               city  \n",
       "0       Springfield  \n",
       "1  Hallandale Beach  \n",
       "2        Chesapeake  \n",
       "3          Kennesaw  \n",
       "4           Peabody  "
      ]
     },
     "execution_count": 21,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df = b.map(flatten).to_dataframe()\n",
    "df.head()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "Stonemason          25\n",
       "Nursing Manager     24\n",
       "Valve Technician    23\n",
       "Horticulturalist    23\n",
       "Theatrical Agent    23\n",
       "Landworker          22\n",
       "Mortician           22\n",
       "Blind Fitter        21\n",
       "Care Assistant      21\n",
       "Almoner             21\n",
       "Name: occupation, dtype: int64"
      ]
     },
     "execution_count": 22,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# dataframe操作\n",
    "df[df.age > 30].occupation.value_counts().nlargest(10).compute()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [],
   "source": [
    "client.shutdown()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}