{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Dask Bag\n", "\n", "Dask-bag 擅长处理可以表示为任意输入序列的数据。我们将其称为“杂乱”数据,因为它可能包含复杂的嵌套结构、缺失的字段、数据类型的混合等。\n", "\n", "默认情况下,dask.bag使用dask.multiprocessing的计算。作为一个好处,Dask 绕过GIL并在纯 Python 对象上使用多个内核。作为一个缺点,Dask Bag 在包含大量工作间通信的计算中表现不佳。" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from dask.distributed import Client, progress\n", "client = Client(n_workers=4, threads_per_worker=1)\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 创建随机数据\n", "\n", "创建一组随机的记录数据,并将其以JSON形式存储到磁盘。" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['/Users/kefei/demo/dask-learn/data/0.json',\n", " '/Users/kefei/demo/dask-learn/data/1.json',\n", " '/Users/kefei/demo/dask-learn/data/2.json',\n", " '/Users/kefei/demo/dask-learn/data/3.json',\n", " '/Users/kefei/demo/dask-learn/data/4.json',\n", " '/Users/kefei/demo/dask-learn/data/5.json',\n", " '/Users/kefei/demo/dask-learn/data/6.json',\n", " '/Users/kefei/demo/dask-learn/data/7.json',\n", " '/Users/kefei/demo/dask-learn/data/8.json',\n", " '/Users/kefei/demo/dask-learn/data/9.json']" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import dask\n", "import json\n", "import os\n", "\n", "os.makedirs('data', exist_ok=True) \n", "\n", "b = dask.datasets.make_people() \n", "b.map(json.dumps).to_textfiles('data/*.json') " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 读取JSON数据" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{\"age\": 51, \"name\": [\"Chas\", \"Ratliff\"], \"occupation\": \"Seaman\", \"telephone\": \"344-219-9130\", \"address\": {\"address\": \"1135 Hawthorne Concession road\", \"city\": \"Springfield\"}, \"credit-card\": {\"number\": \"3416 044674 28287\", \"expiration-date\": \"06/20\"}}\n", "{\"age\": 53, \"name\": [\"Wilbur\", \"Cannon\"], \"occupation\": \"Geophysicist\", \"telephone\": \"(219) 049-8427\", \"address\": {\"address\": \"871 Bernal Heights Lane\", \"city\": \"Hallandale Beach\"}, \"credit-card\": {\"number\": \"4315 9929 2231 3721\", \"expiration-date\": \"04/22\"}}\n" ] } ], "source": [ "!head -n 2 data/0.json" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "dask.bag<loads, npartitions=20>" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import dask.bag as db\n", "import json\n", "\n", "b = db.read_text('data/*.json').map(json.loads)\n", "b" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "({'age': 51,\n", " 'name': ['Chas', 'Ratliff'],\n", " 'occupation': 'Seaman',\n", " 'telephone': '344-219-9130',\n", " 'address': {'address': '1135 Hawthorne Concession road',\n", " 'city': 'Springfield'},\n", " 'credit-card': {'number': '3416 044674 28287', 'expiration-date': '06/20'}},\n", " {'age': 53,\n", " 'name': ['Wilbur', 'Cannon'],\n", " 'occupation': 'Geophysicist',\n", " 'telephone': '(219) 049-8427',\n", " 'address': {'address': '871 Bernal Heights Lane',\n", " 'city': 'Hallandale Beach'},\n", " 'credit-card': {'number': '4315 9929 2231 3721',\n", " 'expiration-date': '04/22'}})" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 查看前两个数据\n", "b.take(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Map, Filter, Aggregate" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "({'age': 51,\n", " 'name': ['Chas', 'Ratliff'],\n", " 'occupation': 'Seaman',\n", " 'telephone': '344-219-9130',\n", " 'address': {'address': '1135 Hawthorne Concession road',\n", " 'city': 'Springfield'},\n", " 'credit-card': {'number': '3416 044674 28287', 'expiration-date': '06/20'}},\n", " {'age': 53,\n", " 'name': ['Wilbur', 'Cannon'],\n", " 'occupation': 'Geophysicist',\n", " 'telephone': '(219) 049-8427',\n", " 'address': {'address': '871 Bernal Heights Lane',\n", " 'city': 'Hallandale Beach'},\n", " 'credit-card': {'number': '4315 9929 2231 3721',\n", " 'expiration-date': '04/22'}})" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 查看年龄超过30的人\n", "b.filter(lambda record: record['age'] > 30).take(2)" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "('Seaman', 'Geophysicist')" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 查看每个记录中的occupation值\n", "b.map(lambda record: record['occupation']).take(2)" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "17025" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 统计数量\n", "b.count().compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 链式计算" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "dask.bag<topk-aggregate, npartitions=1>" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 计算年龄大于30的按occupation排序的前10位\n", "result = (b.filter(lambda record: record['age'] > 30)\n", " .map(lambda record: record['occupation'])\n", " .frequencies(sort=True)\n", " .topk(10, key=1))\n", "result" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Stonemason', 25),\n", " ('Nursing Manager', 24),\n", " ('Theatrical Agent', 23),\n", " ('Horticulturalist', 23),\n", " ('Valve Technician', 23),\n", " ('Landworker', 22),\n", " ('Mortician', 22),\n", " ('Blind Fitter', 21),\n", " ('Care Assistant', 21),\n", " ('Seamstress', 21)]" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "result.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 数据转换+存储" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['/Users/kefei/demo/dask-learn/data/processed.00.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.01.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.02.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.03.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.04.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.05.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.06.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.07.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.08.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.09.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.10.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.11.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.12.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.13.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.14.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.15.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.16.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.17.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.18.json',\n", " '/Users/kefei/demo/dask-learn/data/processed.19.json']" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "(b.filter(lambda record: record['age'] > 30) \n", " .map(json.dumps) \n", " .to_textfiles('data/processed.*.json'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 预处理后转换为DataFramesb.take(1)" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "({'age': 51,\n", " 'name': ['Chas', 'Ratliff'],\n", " 'occupation': 'Seaman',\n", " 'telephone': '344-219-9130',\n", " 'address': {'address': '1135 Hawthorne Concession road',\n", " 'city': 'Springfield'},\n", " 'credit-card': {'number': '3416 044674 28287', 'expiration-date': '06/20'}},)" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "b.take(1)" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "({'age': 51,\n", " 'occupation': 'Seaman',\n", " 'telephone': '344-219-9130',\n", " 'credit-card-number': '3416 044674 28287',\n", " 'credit-card-expiration': '06/20',\n", " 'name': 'Chas Ratliff',\n", " 'street-address': '1135 Hawthorne Concession road',\n", " 'city': 'Springfield'},)" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "def flatten(record):\n", " return {\n", " 'age': record['age'],\n", " 'occupation': record['occupation'],\n", " 'telephone': record['telephone'],\n", " 'credit-card-number': record['credit-card']['number'],\n", " 'credit-card-expiration': record['credit-card']['expiration-date'],\n", " 'name': ' '.join(record['name']),\n", " 'street-address': record['address']['address'],\n", " 'city': record['address']['city']\n", " }\n", "\n", "b.map(flatten).take(1)" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "data": { "text/html": [ "<div>\n", "<style scoped>\n", " .dataframe tbody tr th:only-of-type {\n", " vertical-align: middle;\n", " }\n", "\n", " .dataframe tbody tr th {\n", " vertical-align: top;\n", " }\n", "\n", " .dataframe thead th {\n", " text-align: right;\n", " }\n", "</style>\n", "<table border=\"1\" class=\"dataframe\">\n", " <thead>\n", " <tr style=\"text-align: right;\">\n", " <th></th>\n", " <th>age</th>\n", " <th>occupation</th>\n", " <th>telephone</th>\n", " <th>credit-card-number</th>\n", " <th>credit-card-expiration</th>\n", " <th>name</th>\n", " <th>street-address</th>\n", " <th>city</th>\n", " </tr>\n", " </thead>\n", " <tbody>\n", " <tr>\n", " <th>0</th>\n", " <td>51</td>\n", " <td>Seaman</td>\n", " <td>344-219-9130</td>\n", " <td>3416 044674 28287</td>\n", " <td>06/20</td>\n", " <td>Chas Ratliff</td>\n", " <td>1135 Hawthorne Concession road</td>\n", " <td>Springfield</td>\n", " </tr>\n", " <tr>\n", " <th>1</th>\n", " <td>53</td>\n", " <td>Geophysicist</td>\n", " <td>(219) 049-8427</td>\n", " <td>4315 9929 2231 3721</td>\n", " <td>04/22</td>\n", " <td>Wilbur Cannon</td>\n", " <td>871 Bernal Heights Lane</td>\n", " <td>Hallandale Beach</td>\n", " </tr>\n", " <tr>\n", " <th>2</th>\n", " <td>24</td>\n", " <td>Hospital Worker</td>\n", " <td>1-899-405-6481</td>\n", " <td>4494 7877 3692 6723</td>\n", " <td>03/22</td>\n", " <td>Spring Case</td>\n", " <td>820 St. Paul Street-Calvert Hill</td>\n", " <td>Chesapeake</td>\n", " </tr>\n", " <tr>\n", " <th>3</th>\n", " <td>60</td>\n", " <td>Osteopath</td>\n", " <td>1-677-679-4179</td>\n", " <td>4848 0637 1092 6516</td>\n", " <td>12/25</td>\n", " <td>Lou Galloway</td>\n", " <td>1076 Ellis Viaduct</td>\n", " <td>Kennesaw</td>\n", " </tr>\n", " <tr>\n", " <th>4</th>\n", " <td>37</td>\n", " <td>Art Historian</td>\n", " <td>514-516-9013</td>\n", " <td>3455 426227 43867</td>\n", " <td>06/17</td>\n", " <td>Thaddeus Kirk</td>\n", " <td>576 Candyland Nene</td>\n", " <td>Peabody</td>\n", " </tr>\n", " </tbody>\n", "</table>\n", "</div>" ], "text/plain": [ " age occupation telephone credit-card-number \\\n", "0 51 Seaman 344-219-9130 3416 044674 28287 \n", "1 53 Geophysicist (219) 049-8427 4315 9929 2231 3721 \n", "2 24 Hospital Worker 1-899-405-6481 4494 7877 3692 6723 \n", "3 60 Osteopath 1-677-679-4179 4848 0637 1092 6516 \n", "4 37 Art Historian 514-516-9013 3455 426227 43867 \n", "\n", " credit-card-expiration name street-address \\\n", "0 06/20 Chas Ratliff 1135 Hawthorne Concession road \n", "1 04/22 Wilbur Cannon 871 Bernal Heights Lane \n", "2 03/22 Spring Case 820 St. Paul Street-Calvert Hill \n", "3 12/25 Lou Galloway 1076 Ellis Viaduct \n", "4 06/17 Thaddeus Kirk 576 Candyland Nene \n", "\n", " city \n", "0 Springfield \n", "1 Hallandale Beach \n", "2 Chesapeake \n", "3 Kennesaw \n", "4 Peabody " ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "df = b.map(flatten).to_dataframe()\n", "df.head()" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Stonemason 25\n", "Nursing Manager 24\n", "Valve Technician 23\n", "Horticulturalist 23\n", "Theatrical Agent 23\n", "Landworker 22\n", "Mortician 22\n", "Blind Fitter 21\n", "Care Assistant 21\n", "Almoner 21\n", "Name: occupation, dtype: int64" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# dataframe操作\n", "df[df.age > 30].occupation.value_counts().nlargest(10).compute()" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [], "source": [ "client.shutdown()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.8" } }, "nbformat": 4, "nbformat_minor": 4 }