# Dask DataFrames

Dask DataFrames 协调沿索引排列的许多 Pandas DataFrames/Series。Dask DataFrame 按行分区,按索引值对行进行分组以提高效率。这些 Pandas 对象可能存在于磁盘或其他机器上。

<img src="./images/dask-dataframe.svg" alt="img" width=400 />

Pandas 非常适合适合内存的表格数据集。当您要分析的数据集大于机器的 RAM 时,Dask 会变得很有用。

In [1]:
from dask.distributed import Client, progress

client = Client(n_workers=2, threads_per_worker=2, memory_limit='1GB')
client

0,1
Client  Scheduler: tcp://127.0.0.1:58039  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 4  Memory: 1.86 GiB


## 创建随机时间序列数据
创建具有以下属性的随机数据时间序列:

1. 它存储 2000 年每 10 秒的记录

2. 它逐月拆分,将每个月保留为单独的 Pandas 数据框

3. 除了日期时间索引,它还包含名称、ID 和数值列

这是一个大约 240 MB 的小数据集。增加天数或减少练习更大数据集的频率。

In [8]:
import dask
import dask.dataframe as dd
df = dask.datasets.timeseries()

In [9]:
# Dask DataFrames 是惰性的,因此这里不打印数据
df

Unnamed: 0_level_0,id,name,x,y
npartitions=30,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2000-01-01,int64,object,float64,float64
2000-01-02,...,...,...,...
...,...,...,...,...
2000-01-30,...,...,...,...
2000-01-31,...,...,...,...


In [10]:
df.dtypes

id        int64
name     object
x       float64
y       float64
dtype: object

## 使用标准 Pandas 操作

In [11]:
# 计算方差
df2 = df[df.y > 0]
df3 = df2.groupby('name').x.std()
df3

Dask Series Structure:
npartitions=1
    float64
        ...
Name: x, dtype: float64
Dask Name: sqrt, 157 tasks

In [12]:
%time computed_df = df3.compute()

CPU times: user 185 ms, sys: 19.9 ms, total: 204 ms
Wall time: 1.28 s


In [13]:
computed_df

name
Alice       0.577885
Bob         0.576743
Charlie     0.579827
Dan         0.577208
Edith       0.577062
Frank       0.576297
George      0.575883
Hannah      0.576153
Ingrid      0.580452
Jerry       0.574682
Kevin       0.577536
Laura       0.578090
Michael     0.577965
Norbert     0.578714
Oliver      0.577328
Patricia    0.577173
Quinn       0.577750
Ray         0.576110
Sarah       0.576397
Tim         0.577916
Ursula      0.576522
Victor      0.578701
Wendy       0.576590
Xavier      0.576980
Yvonne      0.576429
Zelda       0.576003
Name: x, dtype: float64

## 持久化数据到内存中

In [14]:
df = df.persist()

In [15]:
df2 = df[df.y > 0]
df3 = df2.groupby('name').x.std()
%time df3.compute()

CPU times: user 133 ms, sys: 12 ms, total: 145 ms
Wall time: 494 ms


name
Alice       0.577885
Bob         0.576743
Charlie     0.579827
Dan         0.577208
Edith       0.577062
Frank       0.576297
George      0.575883
Hannah      0.576153
Ingrid      0.580452
Jerry       0.574682
Kevin       0.577536
Laura       0.578090
Michael     0.577965
Norbert     0.578714
Oliver      0.577328
Patricia    0.577173
Quinn       0.577750
Ray         0.576110
Sarah       0.576397
Tim         0.577916
Ursula      0.576522
Victor      0.578701
Wendy       0.576590
Xavier      0.576980
Yvonne      0.576429
Zelda       0.576003
Name: x, dtype: float64

## 设置索引

数据是按索引列排序。这允许更快的访问、joins、groupby-apply操作。

然而,对数据进行并行排序可能代价高昂,因此设置索引很重要,但很少使用。

In [16]:
# 按照名称重排序
df = df.set_index('name')
df

Unnamed: 0_level_0,id,x,y
npartitions=30,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Alice,int64,float64,float64
Alice,...,...,...
...,...,...,...
Zelda,...,...,...
Zelda,...,...,...


In [17]:
df = df.persist()

In [18]:
# 此时针对name字段的值进行索引就很快

%time df.loc['Alice'].compute()

CPU times: user 39.2 ms, sys: 11.7 ms, total: 51 ms
Wall time: 88 ms


Unnamed: 0_level_0,id,x,y
name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
Alice,928,-0.523920,0.226035
Alice,986,-0.170906,0.354172
Alice,931,-0.499495,0.417713
Alice,1003,0.828322,-0.492837
Alice,987,0.580199,0.944183
...,...,...,...
Alice,1036,-0.247774,-0.955866
Alice,1028,-0.280042,-0.701543
Alice,1037,0.361707,0.656865
Alice,970,0.702504,-0.732269


In [19]:
client.cancel(df)

In [20]:
client.shutdown()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
asyncio.exceptions.CancelledError
