{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Dask Tutorial\n", "\n", "\n", "
\n", "\n", "### Overview\n", " \n", "* **teaching:** 20 minutes\n", "* **exercises:** 0\n", "* **questions:**\n", " * How does Dask parallelize computations in Python?\n", "
" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Table of contents\n", "1. [**Dask primer**](#Dask-primer)\n", "1. [**Dask clusters**](#Dask-Clusters)\n", "1. [**Dask dataframe**](#Dask-Dataframe)\n", "1. [**Dask arrays**](#Dask-Arrays)\n", "1. [**Dask delayed**](#Dask-Delayed)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dask Primer\n", "\n", "\"Dask\n", "\n", "\n", "Dask is a flexible parallel computing library for analytic computing. Dask provides dynamic parallel task scheduling and high-level big-data collections like `dask.array` and `dask.dataframe`. More on dask here: https://docs.dask.org/en/latest/\n", "\n", "_Note: Pieces of this notebook comes from the following sources:_\n", "\n", "- https://github.com/rabernat/research_computing\n", "- https://github.com/dask/dask-examples" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dask Clusters\n", "\n", "Dask needs a collection of computing resources in order to perform parallel computations. Dask Clusters have different names corresponding to different computing environments (for example, [LocalCluster](https://distributed.dask.org/en/latest/local-cluster.html) for your Laptop, [PBSCluster](http://jobqueue.dask.org/) for your HPC, or [Kubernetes Cluster](http://kubernetes.dask.org/) for machines on the Cloud). Each cluster has a certain number of computing resources called 'Workers', that each get allocated CPU and RAM. The dask scheduling system maps jobs to each worker on a cluster for you, so the syntax is mostly the same once you initialize a cluster!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Let's start simple with a LocalCluster that makes use of all the cores and RAM we have on a single machine\n", "from dask.distributed import Client, LocalCluster\n", "cluster = LocalCluster()\n", "# explicitly connect to the cluster we just created\n", "client = Client(cluster)\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dask Dataframe\n", "\n", "If you are working with a very large Pandas dataframe, you can consider parallizing computations by turning it into a Dask Dataframe. Dask Dataframes split a dataframe into partitions along an index. They support a large subset of the Pandas API. You can find additional details and examples here https://examples.dask.org/dataframe.html\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Although this is small csv file, we'll reuse our same example from before!\n", "# Load csv results from server into a Pandas DataFrame\n", "import dask.dataframe as dd\n", "server = 'https://webservices.volcano.si.edu/geoserver/GVP-VOTW/ows?'\n", "query = 'service=WFS&version=2.0.0&request=GetFeature&typeName=GVP-VOTW:Smithsonian_VOTW_Holocene_Volcanoes&outputFormat=csv'\n", "\n", "# blocksize=None means use a single partion\n", "df = dd.read_csv(server+query, blocksize=None)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# We only see the metadata, the actual data are only computed when requested.\n", "df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# We can break up the table into 4 partions to map out to each core:\n", "df = df.repartition(npartitions=4)\n", "df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Let's say we want to know the minimum last eruption year for all volcanoes\n", "last_eruption_year_min = df.Last_Eruption_Year.min()\n", "last_eruption_year_min" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Instead of getting the actual value we see dd.Scalar, which represents a recipe for actually calculating this value\n", "last_eruption_year_min.visualize(format='svg')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# To get the value call the 'compute method'\n", "# NOTE: this was slower than using pandas directly,,, for small data you often don't need to use parallel computing!\n", "last_eruption_year_min.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dask Arrays\n", "\n", "A dask array looks and feels a lot like a numpy array.\n", "However, a dask array doesn't directly hold any data.\n", "Instead, it symbolically represents the computations needed to generate the data.\n", "Nothing is actually computed until the actual numerical values are needed.\n", "This mode of operation is called \"lazy\"; it allows one to build up complex, large calculations symbolically before turning them over the scheduler for execution.\n", "\n", "If we want to create a numpy array of all ones, we do it like this:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "shape = (1000, 4000)\n", "ones_np = np.ones(shape)\n", "ones_np" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This array contains exactly 32 MB of data:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('%.1f MB' % (ones_np.nbytes / 1e6))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's create the same array using dask's array interface." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import dask.array as da\n", "ones = da.ones(shape)\n", "ones" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This works, but we didn't tell dask how to split up the array, so it is not optimized for distributed computation.\n", "\n", "A crucal difference with dask is that we must specify the `chunks` argument. \"Chunks\" describes how the array is split up over many sub-arrays.\n", "\n", "![Dask Arrays](http://dask.pydata.org/en/latest/_images/dask-array-black-text.svg)\n", "_source: [Dask Array Documentation](http://dask.pydata.org/en/latest/array-overview.html)_\n", "\n", "There are [several ways to specify chunks](http://dask.pydata.org/en/latest/array-creation.html#chunks).\n", "In this lecture, we will use a block shape." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "chunk_shape = (1000, 1000)\n", "ones = da.ones(shape, chunks=chunk_shape)\n", "ones" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice that we just see a symbolic representation of the array, including its shape, dtype, and chunksize.\n", "No data has been generated yet.\n", "When we call `.compute()` on a dask array, the computation is trigger and the dask array becomes a numpy array." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ones.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In order to understand what happened when we called `.compute()`, we can visualize the dask _graph_, the symbolic operations that make up the array" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ones.visualize(format='svg')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Our array has four chunks. To generate it, dask calls `np.ones` four times and then concatenates this together into one array.\n", "\n", "Rather than immediately loading a dask array (which puts all the data into RAM), it is more common to reduce the data somehow. For example:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sum_of_ones = ones.sum()\n", "sum_of_ones.visualize(format='svg')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here we see dask's strategy for finding the sum. This simple example illustrates the beauty of dask: it automatically designs an algorithm appropriate for custom operations with big data. \n", "\n", "If we make our operation more complex, the graph gets more complex." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fancy_calculation = (ones * ones[::-1, ::-1]).mean()\n", "fancy_calculation.visualize(format='svg')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### A Bigger Calculation\n", "\n", "The examples above were toy examples; the data (32 MB) is nowhere nearly big enough to warrant the use of dask.\n", "\n", "We can make it a lot bigger!" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bigshape = (200000, 4000)\n", "big_ones = da.ones(bigshape, chunks=chunk_shape)\n", "big_ones" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('%.1f MB' % (big_ones.nbytes / 1e6))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This dataset is 6.4 GB, rather than 32 MB! This is probably close to or greater than the amount of available RAM than you have in your computer. Nevertheless, dask has no problem working on it.\n", "\n", "_Do not try to `.visualize()` this array!_\n", "\n", "When doing a big calculation, dask also has some tools to help us understand what is happening under the hood. Let's watch the dashboard again as we do a bigger computation." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "big_calc = (big_ones * big_ones[::-1, ::-1]).mean()\n", "\n", "result = big_calc.compute()\n", "result" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Reduction \n", "\n", "All the usual numpy methods work on dask arrays.\n", "You can also apply numpy function directly to a dask array, and it will stay lazy." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "big_ones_reduce = (np.cos(big_ones)**2).mean(axis=1)\n", "big_ones_reduce" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Plotting also triggers computation, since we need the actual values" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from matplotlib import pyplot as plt\n", "%matplotlib inline\n", "plt.rcParams['figure.figsize'] = (12,8)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plt.plot(big_ones_reduce)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Dask Delayed\n", "\n", "Dask.delayed is a simple and powerful way to parallelize existing code. It allows users to delay function calls into a task graph with dependencies. Dask.delayed doesn't provide any fancy parallel algorithms like Dask.dataframe, but it does give the user complete control over what they want to build.\n", "\n", "Systems like Dask.dataframe are built with Dask.delayed. If you have a problem that is paralellizable, but isn't as simple as just a big array or a big dataframe, then dask.delayed may be the right choice for you.\n", "\n", "## Create simple functions\n", "\n", "These functions do simple operations like add two numbers together, but they sleep for a random amount of time to simulate real work." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "\n", "def inc(x):\n", " time.sleep(0.1)\n", " return x + 1\n", "\n", "def dec(x):\n", " time.sleep(0.1)\n", " return x - 1\n", " \n", "def add(x, y):\n", " time.sleep(0.2)\n", " return x + y " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can run them like normal Python functions below" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "x = inc(1)\n", "y = dec(2)\n", "z = add(x, y)\n", "z" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "These ran one after the other, in sequence. Note though that the first two lines `inc(1)` and `dec(2)` don't depend on each other, we *could* have called them in parallel had we been clever.\n", "\n", "## Annotate functions with Dask Delayed to make them lazy\n", "\n", "We can call `dask.delayed` on our funtions to make them lazy. Rather than compute their results immediately, they record what we want to compute as a task into a graph that we'll run later on parallel hardware." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import dask\n", "inc = dask.delayed(inc)\n", "dec = dask.delayed(dec)\n", "add = dask.delayed(add)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Calling these lazy functions is now almost free. We're just constructing a graph" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "x = inc(1)\n", "y = dec(2)\n", "z = add(x, y)\n", "z" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Visualize computation" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "z.visualize(format='svg', rankdir='LR')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run in parallel\n", "\n", "Call `.compute()` when you want your result as a normal Python object\n", "\n", "If you started `Client()` above then you may want to watch the status page during computation." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "z.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Parallelize Normal Python code\n", "\n", "Now we use Dask in normal for-loopy Python code. This generates graphs instead of doing computations directly, but still looks like the code we had before. Dask is a convenient way to add parallelism to existing workflows." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "zs = []\n", "for i in range(256):\n", " x = inc(i)\n", " y = dec(x)\n", " z = add(x, y)\n", " zs.append(z)\n", " \n", "zs = dask.persist(*zs) # trigger computation in the background" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 4 }