{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\"Dask\n", "\n", "Custom Workloads\n", "-------------------------\n", "\n", "*Because not all problems are dataframes*" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from dask_gateway import Gateway\n", "\n", "gateway = Gateway()\n", "cluster = gateway.new_cluster()\n", "cluster.scale(10)\n", "cluster" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from dask.distributed import Client, progress\n", "c = Client(cluster)\n", "c" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from time import sleep\n", "\n", "def inc(x):\n", " from random import random\n", " sleep(random())\n", " return x + 1\n", "\n", "def double(x):\n", " from random import random\n", " sleep(random())\n", " return 2 * x\n", " \n", "def add(x, y):\n", " from random import random\n", " sleep(random())\n", " return x + y " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "inc(1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "future = c.submit(inc, 1) # returns immediately with pending future\n", "future" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "future # scheduler and client talk constantly" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "future.result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Submit many tasks\n", "\n", "We submit many tasks that depend on each other in a normal Python for loop" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "zs = []\n", "for i in range(256):\n", " x = c.submit(inc, i) # x = inc(i)\n", " y = c.submit(double, x) # y = inc(x)\n", " z = c.submit(add, x, y) # z = inc(y)\n", " zs.append(z)\n", " \n", "total = c.submit(sum, zs)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "total.result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Custom computation: Tree summation\n", "\n", "As an example of a non-trivial algorithm, consider the classic tree reduction. We accomplish this with a nested for loop and a bit of normal Python logic.\n", "\n", "```\n", "finish total single output\n", " ^ / \\\n", " | c1 c2 neighbors merge\n", " | / \\ / \\\n", " | b1 b2 b3 b4 neighbors merge\n", " ^ / \\ / \\ / \\ / \\\n", "start a1 a2 a3 a4 a5 a6 a7 a8 many inputs\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "L = zs\n", "while len(L) > 1:\n", " new_L = []\n", " for i in range(0, len(L), 2):\n", " future = c.submit(add, L[i], L[i + 1]) # add neighbors\n", " new_L.append(future)\n", " L = new_L # swap old list for new\n", " " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "del L" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Visualize Computation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from dask import delayed, visualize\n", "\n", "L = zs\n", "while len(L) > 1:\n", " new_L = []\n", " for i in range(0, len(L), 2):\n", " future = delayed(add)(L[i], L[i + 1]) # add neighbors\n", " new_L.append(future)\n", " L = new_L # swap old list for new\n", " \n", "visualize(*L)" ] } ], "metadata": { "anaconda-cloud": {}, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 2 }