# Parallelize with Dask

**This notebooks requires a dask cluster, which is not always available. We provide it during special tutorial events and sometimes at other times.**

In this notebook you will:

* Use ``dask`` to parallelize computations on images.

Recommended Prerequisites:

* [Handling Images](./Handling%20Images.ipynb)

[Dask Dashboard](http://ae8cfeafc5ad611e88b00021c84f1ed3-1000724570.us-east-1.elb.amazonaws.com/status)

In [None]:
SCHEDULER_ADDRESS = 'ae8cfeafc5ad611e88b00021c84f1ed3-1000724570.us-east-1.elb.amazonaws.com:8786'

from dask.distributed import Client
cli = Client(SCHEDULER_ADDRESS)

## Compare `numpy` and `dask.array`

A toy example: Generate a 100 x 100 array of random numbers and sum them.

In [None]:
import numpy as np

arr = np.random.random((100, 100))
arr

In [None]:
arr.sum()

For a much larger array, we may not have enough memory on one machine to get this done. Enter ``dask.array``!

In [None]:
import dask.array as da

arr = da.random.random((10000, 10000), chunks=(100, 100)) # very similar interface to numpy -- just adds `chunks`
arr

In [None]:
arr.sum()

This is just a representation of work *still to be done*. When we are ready, we do it by calling `compute()`.

In [None]:
arr.sum().compute()

## Parallelized Gaussian convolution over distributed remote machines

Let us obtain one image of a noisy spot.
## Configuration
Below, we will connect to EPICS IOC(s) controlling simulated hardware in lieu of actual motors, detectors. The IOCs should already be running in the background. Run this command to verify that they are running: it should produce output with RUNNING on each line. In the event of a problem, edit this command to replace `status` with `restart all` and run again.

In [None]:
!supervisorctl -c supervisor/supervisord.conf status

In [None]:
%run scripts/beamline_configuration.py

In [None]:
RE(mv(spot.exp, 0.005)) # low exposure to get high-noise images
RE(count([spot]))

In [None]:
img, = db[-1].data('spot_img')

Convolve the image with a Gaussian. (Blur away the noise.)

In [None]:
def convolve2d(image):
 def gaussian(x, size):
 x = np.asarray(x)
 return 1 / (np.sqrt(2) * np.pi * size) * np.exp(-x**2 / (2 * size**2))
 kernel = gaussian([-1, 0, 1], size=1)
 result = np.empty_like(image)
 for i, row in enumerate(image):
 result[i] = np.convolve(row, kernel, mode='same')
 for i, col in enumerate(result.T):
 result[:, i] = np.convolve(col, kernel, mode='same')
 return result

In [None]:
res = convolve2d(img) # the standard numpy way

In [None]:
fig, (ax1, ax2) = plt.subplots(1, 2)
ax1.imshow(img)
ax2.imshow(res)
ax1.set_title('raw')
ax2.set_title('convolved with Gaussian')

### Now with dask

We will tell dask to represent the image as 100 x 100 chunks, which can be processed independently by separate workers in parallel. (There is a flaw in this plan....)

In [None]:
dask_img = da.from_array(img, chunks=(100, 100))
dask_res = dask_img.map_blocks(convolve2d, dtype=float).compute()

In [None]:
fig, (ax1, ax2) = plt.subplots(1, 2)
ax1.imshow(img)
ax2.imshow(dask_res)
ax1.set_title('raw')
ax2.set_title('convolved with Gaussian')

We see the seams. But dask easily be clever by slicing up the image chunks with overlapping margins and correctly reassembling the result.

In [None]:
ghost_dask_res = dask_img.map_overlap(convolve2d, depth=3, dtype=float).compute()

In [None]:
fig, (ax1, ax2) = plt.subplots(1, 2)
ax1.imshow(img)
ax2.imshow(ghost_dask_res)
ax1.set_title('raw')
ax2.set_title('convolved with Gaussian')

## Exercises

Learn more in the [dask documentation](https://dask.pydata.org/en/latest/) and try some examples here.

In [None]:
# Hack away....