## unyt_dask arrays

This notebook demonstrates the latest version of the `unyt_dask_array` implementation at https://github.com/chrishavlin/unyt/tree/dask_unyt 

This implementation adds dask as an optional dependency to `unyt`, and subclasses `dask.array.core.Array` to create a unyt array with dask abilities. 

The main access point is through the `unyt_from_dask` function, which takes a dask array and user-specified units information to create a `unyt_dask_array` object:

In [1]:
from unyt.dask_array import unyt_from_dask
from dask import array as dask_array

x = unyt_from_dask(dask_array.random.random((10000,10000), chunks=(1000,1000)), 'm')
x

Unnamed: 0,Array,Chunk
Bytes,800.00 MB,8.00 MB
Shape,"(10000, 10000)","(1000, 1000)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray
Units,m,m
"Array Chunk Bytes 800.00 MB 8.00 MB Shape (10000, 10000) (1000, 1000) Count 100 Tasks 100 Chunks Type float64 numpy.ndarray Units m m",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,800.00 MB,8.00 MB
Shape,"(10000, 10000)","(1000, 1000)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray
Units,m,m


This array behaves like a dask array, so that when operations are applied, we initially only build the dask execution graph:


In [3]:
result = (x * 2).mean()
result

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,339 Tasks,1 Chunks
Type,float64,numpy.ndarray
Units,m,m
Array Chunk Bytes 8 B 8 B Shape () () Count 339 Tasks 1 Chunks Type float64 numpy.ndarray Units m m,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,339 Tasks,1 Chunks
Type,float64,numpy.ndarray
Units,m,m


and when we execute that graph, we get back a base `unyt_quantity` or `unyt_array` depending on the number of elements reutrned:

In [4]:
result.compute()

unyt_quantity(1.00004815, 'm')

In [5]:
result = (x * 2).mean(1)
result.compute()

unyt_array([0.99678626, 0.99558319, 1.00601329, ..., 1.00921666,
            0.99226075, 1.00205869], 'm')

adding or subtracting follows the unyt behavior, in that we need to add/subtract objects that have units. If adding a constant, it must be a `unyt_quantity`:

In [6]:
# this will error
result = x + 2

UnitOperationError: The <ufunc 'add'> operator for unyt_arrays with units "m" (dimensions "(length)") and "dimensionless" (dimensions "1") is not well defined.

In [7]:
from unyt import unyt_quantity

result = x + unyt_quantity(10, 'm')
result

Unnamed: 0,Array,Chunk
Bytes,800.00 MB,8.00 MB
Shape,"(10000, 10000)","(1000, 1000)"
Count,200 Tasks,100 Chunks
Type,float64,numpy.ndarray
Units,m,m
"Array Chunk Bytes 800.00 MB 8.00 MB Shape (10000, 10000) (1000, 1000) Count 200 Tasks 100 Chunks Type float64 numpy.ndarray Units m m",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,800.00 MB,8.00 MB
Shape,"(10000, 10000)","(1000, 1000)"
Count,200 Tasks,100 Chunks
Type,float64,numpy.ndarray
Units,m,m


In [8]:
result.mean().compute()

unyt_quantity(10.50002407, 'm')

The `unyt_dask_array` class will convert units of the same dimension before calculation, following normal unyt behavior:

In [11]:
result = x + unyt_quantity(1000, 'cm')
result.mean().compute()

unyt_quantity(10.50002407, 'm')

In [12]:
result = x.to('km') + unyt_quantity(1000, 'cm')
result.mean().compute().to('m')

unyt_quantity(10.50002407, 'm')

Or, in the case of multiple `unyt_dask_arrays`:

In [15]:
x1 = unyt_from_dask(dask_array.random.random((10000,10000), chunks=(1000,1000)), 'm')
x2 = unyt_from_dask(dask_array.random.random((10000,10000), chunks=(1000,1000)), 'cm')
x3 = unyt_from_dask(dask_array.random.random((10000,10000), chunks=(1000,1000)), 'km')

x4 = (x1 * x2 + x3 * x2) / x1
x4

Unnamed: 0,Array,Chunk
Bytes,800.00 MB,8.00 MB
Shape,"(10000, 10000)","(1000, 1000)"
Count,1000 Tasks,100 Chunks
Type,float64,numpy.ndarray
Units,km,km
"Array Chunk Bytes 800.00 MB 8.00 MB Shape (10000, 10000) (1000, 1000) Count 1000 Tasks 100 Chunks Type float64 numpy.ndarray Units km km",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,800.00 MB,8.00 MB
Shape,"(10000, 10000)","(1000, 1000)"
Count,1000 Tasks,100 Chunks
Type,float64,numpy.ndarray
Units,km,km


In [16]:
x4.mean().compute()

unyt_quantity(5.565572e-05, 'km')

If a dask client is active, then execution is managed by the client:

In [30]:
from dask.distributed import Client
client = Client(threads_per_worker=4, n_workers=1)

In [31]:
client


0,1
Client  Scheduler: tcp://127.0.0.1:43359  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 1  Cores: 4  Memory: 33.51 GB


In [34]:
x_da = unyt_from_dask(dask_array.random.random((10000, 10000), chunks=(1000, 1000)), 'm')

In [35]:
x_da.min()

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,239 Tasks,1 Chunks
Type,float64,numpy.ndarray
Units,m,m
Array Chunk Bytes 8 B 8 B Shape () () Count 239 Tasks 1 Chunks Type float64 numpy.ndarray Units m m,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,239 Tasks,1 Chunks
Type,float64,numpy.ndarray
Units,m,m


In [36]:
x_da.to('km')

Unnamed: 0,Array,Chunk
Bytes,800.00 MB,8.00 MB
Shape,"(10000, 10000)","(1000, 1000)"
Count,200 Tasks,100 Chunks
Type,float64,numpy.ndarray
Units,km,km
"Array Chunk Bytes 800.00 MB 8.00 MB Shape (10000, 10000) (1000, 1000) Count 200 Tasks 100 Chunks Type float64 numpy.ndarray Units km km",10000  10000,

Unnamed: 0,Array,Chunk
Bytes,800.00 MB,8.00 MB
Shape,"(10000, 10000)","(1000, 1000)"
Count,200 Tasks,100 Chunks
Type,float64,numpy.ndarray
Units,km,km


In [39]:
x_da.to('km').max().compute()

unyt_quantity(0.001, 'km')