# Dask Array
![img](./images/dask-array-black-text.svg)

Dask 数组协调许多 Numpy 数组，在网格内排列成块。它们支持 Numpy API 的很大一部分。

Dask 数组使用阻塞算法提供了一个并行的、大于内存的 n 维数组。简单地说：分布式 Numpy

- **并行**：使用计算机上的所有内核

- **大于内存**：通过将数组分解成许多小块，按顺序操作这些块以最大限度地减少计算的内存占用，并有效地从磁盘流式传输数据，从而让您可以处理大于可用内存的数据集.

- **阻塞算法**：通过执行许多较小的计算来执行大型计算

In [1]:
# 连接/创建 Dask集群

from dask.distributed import Client

client = Client(n_workers=4, threads_per_worker=1)
client

0,1
Client  Scheduler: tcp://127.0.0.1:57436  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 4  Memory: 16.00 GiB


## 示例一

**针对一个有10亿个随机数的大数组做加法，分而治之**

dask使用chunks参数将大数组切分成小块，先针对小块数组进行计算，然后将小块计算的结果再计算，最后得到结果

In [5]:
import dask.array as da
x = da.random.random((1_000_000_000,), chunks=(1_000_000,))
x

Unnamed: 0,Array,Chunk
Bytes,7.45 GiB,7.63 MiB
Shape,"(1000000000,)","(1000000,)"
Count,1000 Tasks,1000 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 7.45 GiB 7.63 MiB Shape (1000000000,) (1000000,) Count 1000 Tasks 1000 Chunks Type float64 numpy.ndarray",1000000000  1,

Unnamed: 0,Array,Chunk
Bytes,7.45 GiB,7.63 MiB
Shape,"(1000000000,)","(1000000,)"
Count,1000 Tasks,1000 Chunks
Type,float64,numpy.ndarray


In [6]:
result = x.sum()
result

Unnamed: 0,Array,Chunk
Bytes,8 B,8.0 B
Shape,(),()
Count,2334 Tasks,1 Chunks
Type,float64,numpy.ndarray
Array Chunk Bytes 8 B 8.0 B Shape () () Count 2334 Tasks 1 Chunks Type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8.0 B
Shape,(),()
Count,2334 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [8]:
%time result.compute()

CPU times: user 2.3 s, sys: 246 ms, total: 2.55 s
Wall time: 3.94 s


499998636.25802165

In [9]:
%%time
# numpy运行时间

import numpy as np
x = np.random.random((1_000_000_000,))
x.sum()

CPU times: user 10.4 s, sys: 5.79 s, total: 16.2 s
Wall time: 16.9 s


499997790.0394625

## 示例二

1. **构建一个 20000x20000 的正态分布随机值数组，该数组被分成 1000x1000 大小的块**
2. **沿一个轴取平均值**
3. **取每 100 个元素**

In [10]:
import numpy as np
import dask.array as da

x = da.random.normal(10, 0.1, size=(20000, 20000),
                              chunks=(1000, 1000))
x

Unnamed: 0,Array,Chunk
Bytes,2.98 GiB,7.63 MiB
Shape,"(20000, 20000)","(1000, 1000)"
Count,400 Tasks,400 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.98 GiB 7.63 MiB Shape (20000, 20000) (1000, 1000) Count 400 Tasks 400 Chunks Type float64 numpy.ndarray",20000  20000,

Unnamed: 0,Array,Chunk
Bytes,2.98 GiB,7.63 MiB
Shape,"(20000, 20000)","(1000, 1000)"
Count,400 Tasks,400 Chunks
Type,float64,numpy.ndarray


In [11]:
y = x.mean(axis=0)[::100]
y

Unnamed: 0,Array,Chunk
Bytes,1.56 kiB,80 B
Shape,"(200,)","(10,)"
Count,980 Tasks,20 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.56 kiB 80 B Shape (200,) (10,) Count 980 Tasks 20 Chunks Type float64 numpy.ndarray",200  1,

Unnamed: 0,Array,Chunk
Bytes,1.56 kiB,80 B
Shape,"(200,)","(10,)"
Count,980 Tasks,20 Chunks
Type,float64,numpy.ndarray


In [12]:
%%time
y.compute()

CPU times: user 1.44 s, sys: 227 ms, total: 1.67 s
Wall time: 3.85 s


array([ 9.99938546,  9.99971967,  9.99945469, 10.00064526,  9.99973264,
        9.99962011,  9.99943414, 10.00153313, 10.00006829,  9.9997891 ,
       10.00022229, 10.0000093 , 10.00028218,  9.99916192, 10.00037475,
        9.99818902,  9.99933171, 10.00078998,  9.99950032, 10.00068982,
       10.00030546,  9.99955212, 10.00072705,  9.9992586 , 10.00186248,
       10.00029279, 10.00080394, 10.00114623, 10.00060563,  9.99967053,
        9.99989176,  9.99997171,  9.99912124, 10.00074451, 10.0003222 ,
        9.99986045,  9.99945197,  9.99923763,  9.99958515,  9.99886138,
       10.00146995,  9.99947912,  9.99910516, 10.0005532 , 10.0011445 ,
        9.99997232, 10.00026516,  9.9996024 ,  9.99981061, 10.00036216,
       10.00044439,  9.99971844, 10.00013288,  9.99933069, 10.00047139,
       10.0002093 , 10.0003827 ,  9.99924716,  9.99981332, 10.00164159,
        9.99972159, 10.00005599,  9.99859007, 10.00030065, 10.00181599,
        9.99982092,  9.99857654,  9.99867371, 10.00005552, 10.00

- 更改chunks大小对执行有什么影响？
    - chunks = (10000, 10000)
    - chunks = (25, 25)

In [13]:
import numpy as np
import dask.array as da

x = da.random.normal(10, 0.1, size=(20000, 20000),
                              chunks=(20000, 20000))
x

Unnamed: 0,Array,Chunk
Bytes,2.98 GiB,2.98 GiB
Shape,"(20000, 20000)","(20000, 20000)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.98 GiB 2.98 GiB Shape (20000, 20000) (20000, 20000) Count 1 Tasks 1 Chunks Type float64 numpy.ndarray",20000  20000,

Unnamed: 0,Array,Chunk
Bytes,2.98 GiB,2.98 GiB
Shape,"(20000, 20000)","(20000, 20000)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [14]:
y = x.mean(axis=0)[::100]
%time y.compute()

CPU times: user 681 ms, sys: 74.4 ms, total: 756 ms
Wall time: 11.4 s


array([10.0001655 , 10.00167621,  9.99923351, 10.00064734, 10.00091432,
       10.00047372, 10.00017035,  9.99953996,  9.99968177,  9.99857899,
       10.00003809, 10.00108116,  9.99900785, 10.00027567,  9.99999848,
       10.00082341, 10.00129353, 10.00030403, 10.00066804,  9.99966315,
        9.99924217, 10.00144793, 10.00020324, 10.00086305, 10.00061989,
       10.00084032,  9.99839684, 10.00006433,  9.99977552, 10.00069765,
        9.99952445, 10.00001131, 10.00008032,  9.99987475,  9.99992465,
        9.99989691,  9.99953196, 10.00119205,  9.99926572, 10.00054107,
       10.00015775, 10.0002149 ,  9.9994878 ,  9.99994616,  9.99979669,
       10.00072395,  9.99848976, 10.00013652,  9.99915904,  9.99975547,
        9.99919632, 10.00161185,  9.99997359, 10.00021391,  9.99981143,
        9.99902299,  9.99928706,  9.99987627,  9.99995705,  9.99941156,
        9.99964318, 10.00022528,  9.99921117, 10.00056046,  9.99894001,
        9.99892346,  9.99943849, 10.00010026,  9.99940094,  9.99

In [15]:
%%time
# numpy运算时间
import numpy as np

x = np.random.normal(10, 0.1, size=(20000, 20000))
x.mean(axis=0)[::100]

CPU times: user 10.5 s, sys: 811 ms, total: 11.3 s
Wall time: 10.9 s


array([ 9.99922827, 10.00026646,  9.99968943, 10.00016506,  9.99994516,
       10.00061448,  9.99965795,  9.99985347,  9.99886232, 10.00058193,
       10.00030265,  9.9998264 ,  9.99979297,  9.99964179,  9.99929195,
       10.00065933,  9.99967126,  9.99955685,  9.99986639,  9.99907462,
       10.0005034 , 10.00054594,  9.99936214,  9.99982118,  9.99991934,
        9.99913666,  9.99982351,  9.99973586,  9.99888925, 10.00027446,
        9.99964081, 10.00068381, 10.00094463,  9.99963125,  9.99938888,
       10.00004541, 10.00254318, 10.00053102,  9.99977105,  9.99977276,
        9.99957297,  9.99988611, 10.00120295, 10.000437  ,  9.99979155,
        9.99967142, 10.00020331, 10.00116383, 10.00033801, 10.00021838,
       10.00012446,  9.99951103,  9.99916995,  9.99902008, 10.00034422,
       10.00001313,  9.99980868, 10.00037233, 10.0009992 ,  9.99919649,
        9.99879175, 10.00140597,  9.99973817, 10.00015065,  9.99917851,
        9.99909349, 10.00018382, 10.00004312, 10.00032093,  9.99

# 示例三

**持久化计算到内存中，加速后续计算**

In [16]:
import dask.array as da
x = da.random.random((10000, 10000), chunks=(1000, 1000))
x

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(10000, 10000)","(1000, 1000)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 762.94 MiB 7.63 MiB Shape (10000, 10000) (1000, 1000) Count 100 Tasks 100 Chunks Type float64 numpy.ndarray",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,762.94 MiB,7.63 MiB
Shape,"(10000, 10000)","(1000, 1000)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray


In [17]:
y = x + x.T
z = y[::2, 5000:].mean(axis=1)
z

Unnamed: 0,Array,Chunk
Bytes,39.06 kiB,3.91 kiB
Shape,"(5000,)","(500,)"
Count,430 Tasks,10 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 39.06 kiB 3.91 kiB Shape (5000,) (500,) Count 430 Tasks 10 Chunks Type float64 numpy.ndarray",5000  1,

Unnamed: 0,Array,Chunk
Bytes,39.06 kiB,3.91 kiB
Shape,"(5000,)","(500,)"
Count,430 Tasks,10 Chunks
Type,float64,numpy.ndarray


In [19]:
%time z.compute()

CPU times: user 278 ms, sys: 23.3 ms, total: 302 ms
Wall time: 514 ms


array([1.01400246, 0.99859011, 1.00048605, ..., 1.00554574, 1.00343839,
       1.00203487])

In [20]:
# 持久化
y = y.persist()

In [21]:
%time y[0, 0].compute()

CPU times: user 12.1 ms, sys: 2.99 ms, total: 15.1 ms
Wall time: 14.1 ms


0.0832880607724471

In [22]:
%time y[0, 0].compute()

CPU times: user 7.52 ms, sys: 1.94 ms, total: 9.47 ms
Wall time: 10.1 ms


0.0832880607724471

In [23]:
# 清楚内存占用
client.cancel(y)
# or del y

In [24]:
client.shutdown()