{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Parallelize with Dask\n", "\n", "**This notebooks requires a dask cluster, which is not always available. We provide it during special tutorial events and sometimes at other times.**\n", "\n", "In this notebook you will:\n", "\n", "* Use ``dask`` to parallelize computations on images.\n", "\n", "Recommended Prerequisites:\n", "\n", "* [Handling Images](./Handling%20Images.ipynb)\n", "\n", "[Dask Dashboard](http://ae8cfeafc5ad611e88b00021c84f1ed3-1000724570.us-east-1.elb.amazonaws.com/status)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "SCHEDULER_ADDRESS = 'ae8cfeafc5ad611e88b00021c84f1ed3-1000724570.us-east-1.elb.amazonaws.com:8786'\n", "\n", "from dask.distributed import Client\n", "cli = Client(SCHEDULER_ADDRESS)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Compare `numpy` and `dask.array`\n", "\n", "A toy example: Generate a 100 x 100 array of random numbers and sum them." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "\n", "arr = np.random.random((100, 100))\n", "arr" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "arr.sum()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For a much larger array, we may not have enough memory on one machine to get this done. Enter ``dask.array``!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import dask.array as da\n", "\n", "arr = da.random.random((10000, 10000), chunks=(100, 100)) # very similar interface to numpy -- just adds `chunks`\n", "arr" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "arr.sum()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is just a representation of work *still to be done*. When we are ready, we do it by calling `compute()`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "arr.sum().compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parallelized Gaussian convolution over distributed remote machines" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let us obtain one image of a noisy spot.\n", "## Configuration\n", "Below, we will connect to EPICS IOC(s) controlling simulated hardware in lieu of actual motors, detectors. The IOCs should already be running in the background. Run this command to verify that they are running: it should produce output with RUNNING on each line. In the event of a problem, edit this command to replace `status` with `restart all` and run again." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!supervisorctl -c supervisor/supervisord.conf status" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%run scripts/beamline_configuration.py" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "RE(mv(spot.exp, 0.005)) # low exposure to get high-noise images\n", "RE(count([spot]))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "img, = db[-1].data('spot_img')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Convolve the image with a Gaussian. (Blur away the noise.)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def convolve2d(image):\n", " def gaussian(x, size):\n", " x = np.asarray(x)\n", " return 1 / (np.sqrt(2) * np.pi * size) * np.exp(-x**2 / (2 * size**2))\n", " kernel = gaussian([-1, 0, 1], size=1)\n", " result = np.empty_like(image)\n", " for i, row in enumerate(image):\n", " result[i] = np.convolve(row, kernel, mode='same')\n", " for i, col in enumerate(result.T):\n", " result[:, i] = np.convolve(col, kernel, mode='same')\n", " return result" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "res = convolve2d(img) # the standard numpy way" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fig, (ax1, ax2) = plt.subplots(1, 2)\n", "ax1.imshow(img)\n", "ax2.imshow(res)\n", "ax1.set_title('raw')\n", "ax2.set_title('convolved with Gaussian')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Now with dask\n", "\n", "We will tell dask to represent the image as 100 x 100 chunks, which can be processed independently by separate workers in parallel. (There is a flaw in this plan....)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dask_img = da.from_array(img, chunks=(100, 100))\n", "dask_res = dask_img.map_blocks(convolve2d, dtype=float).compute()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fig, (ax1, ax2) = plt.subplots(1, 2)\n", "ax1.imshow(img)\n", "ax2.imshow(dask_res)\n", "ax1.set_title('raw')\n", "ax2.set_title('convolved with Gaussian')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We see the seams. But dask easily be clever by slicing up the image chunks with overlapping margins and correctly reassembling the result." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ghost_dask_res = dask_img.map_overlap(convolve2d, depth=3, dtype=float).compute()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fig, (ax1, ax2) = plt.subplots(1, 2)\n", "ax1.imshow(img)\n", "ax2.imshow(ghost_dask_res)\n", "ax1.set_title('raw')\n", "ax2.set_title('convolved with Gaussian')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exercises" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Learn more in the [dask documentation](https://dask.pydata.org/en/latest/) and try some examples here." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Hack away...." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.6" } }, "nbformat": 4, "nbformat_minor": 2 }