{ "cells": [ { "cell_type": "markdown", "id": "human-aquarium", "metadata": {}, "source": [ "## Scale with Dask\n", "\n", "The [Planetary Computer Hub](http://planetarycomputer.microsoft.com/compute) is a JupyterHub paired with [Dask Gateway](https://gateway.dask.org/) for easily creating Dask clusters to distribute your computation on a cluster of machines." ] }, { "cell_type": "markdown", "id": "cellular-corps", "metadata": {}, "source": [ "### Creating a cluster\n", "\n", "Use `dask_gateway.GatewayCluster` to quickly create a Dask cluster." ] }, { "cell_type": "code", "execution_count": 1, "id": "facial-organic", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "GatewayCluster\n" ] } ], "source": [ "import dask_gateway\n", "\n", "cluster = dask_gateway.GatewayCluster()\n", "client = cluster.get_client()\n", "cluster.scale(4)\n", "print(cluster.dashboard_link)" ] }, { "cell_type": "markdown", "id": "presidential-healthcare", "metadata": {}, "source": [ "**Don't forget the `client = cluster.get_client()` line.** That's what actually ensures the cluster will be used for computations using Dask. Otherwise, you'll end up using Dask's [local scheduler](https://docs.dask.org/en/stable/10-minutes-to-dask.html#scheduling). This will run the computation on using multiple threads on a single machine, rather than the cluster. When you're using a cluster, make sure to always use [the Dashboard](https://docs.dask.org/en/stable/dashboard.html) (more below). If you aren't seeing any tasks in the dashboard, you might have forgotten to create a Dask client.\n", "\n", "\n", "### Open the dashboard\n", "\n", "The [Dask Dashboard](https://docs.dask.org/en/latest/diagnostics-distributed.html) provides invaluable information on the activity of your cluster. Clicking the \"Dashboard\" link above will open the Dask dashboard a new browser tab.\n", "\n", "\"Dask\n", "\n", "We also include the [dask-labextension](https://github.com/dask/dask-labextension) for laying out the Dask dashboard as tabs in the Jupyterlab workspace.\n", "\n", "\"Dask\n", "\n", "To using the dask-labextension, copy the \"Dashboard\" address from the cluster repr, click the orange Dask logo on the lefthand navigation bar, and paste the dashboard address \n", "\n", "You can close your cluster, freeing up its resources, by calling `cluster.close()`." ] }, { "cell_type": "code", "execution_count": 2, "id": "comparative-alias", "metadata": {}, "outputs": [], "source": [ "cluster.close()" ] }, { "cell_type": "markdown", "id": "complex-captain", "metadata": {}, "source": [ "### Autoscale the cluster to your workload\n", "\n", "Dask Clusters can [automatically adapt](https://docs.dask.org/en/latest/setup/adaptive.html) the cluster size based on the size of the workload. Use `cluster.adapt(minimum, maximum)` to enable adaptive mode." ] }, { "cell_type": "code", "execution_count": 3, "id": "brown-punishment", "metadata": {}, "outputs": [], "source": [ "import dask_gateway\n", "\n", "cluster = dask_gateway.GatewayCluster()\n", "client = cluster.get_client()\n", "cluster.adapt(minimum=2, maximum=50)" ] }, { "cell_type": "markdown", "id": "listed-genetics", "metadata": {}, "source": [ "Dask will add workers as necessary when a computation is submitted. As an example, we'll compute the minimum daily temperature averaged over all of Hawaii, using the [Daymet dataset](http://aka.ms/ai4edata-daymet)." ] }, { "cell_type": "code", "execution_count": 4, "id": "broken-shark", "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "\n", "
<xarray.Dataset>\n",
       "Dimensions:                  (nv: 2, time: 14965, x: 284, y: 584)\n",
       "Coordinates:\n",
       "    lat                      (y, x) float32 dask.array<chunksize=(584, 284), meta=np.ndarray>\n",
       "    lon                      (y, x) float32 dask.array<chunksize=(584, 284), meta=np.ndarray>\n",
       "  * time                     (time) datetime64[ns] 1980-01-01T12:00:00 ... 20...\n",
       "  * x                        (x) float32 -5.802e+06 -5.801e+06 ... -5.519e+06\n",
       "  * y                        (y) float32 -3.9e+04 -4e+04 ... -6.21e+05 -6.22e+05\n",
       "Dimensions without coordinates: nv\n",
       "Data variables:\n",
       "    dayl                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>\n",
       "    lambert_conformal_conic  int16 ...\n",
       "    prcp                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>\n",
       "    srad                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>\n",
       "    swe                      (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>\n",
       "    time_bnds                (time, nv) datetime64[ns] dask.array<chunksize=(365, 2), meta=np.ndarray>\n",
       "    tmax                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>\n",
       "    tmin                     (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>\n",
       "    vp                       (time, y, x) float32 dask.array<chunksize=(365, 584, 284), meta=np.ndarray>\n",
       "    yearday                  (time) int16 dask.array<chunksize=(365,), meta=np.ndarray>\n",
       "Attributes:\n",
       "    Conventions:       CF-1.6\n",
       "    Version_data:      Daymet Data Version 4.0\n",
       "    Version_software:  Daymet Software Version 4.0\n",
       "    citation:          Please see http://daymet.ornl.gov/ for current Daymet ...\n",
       "    references:        Please see http://daymet.ornl.gov/ for current informa...\n",
       "    source:            Daymet Software Version 4.0\n",
       "    start_year:        1980
" ], "text/plain": [ "\n", "Dimensions: (nv: 2, time: 14965, x: 284, y: 584)\n", "Coordinates:\n", " lat (y, x) float32 dask.array\n", " lon (y, x) float32 dask.array\n", " * time (time) datetime64[ns] 1980-01-01T12:00:00 ... 20...\n", " * x (x) float32 -5.802e+06 -5.801e+06 ... -5.519e+06\n", " * y (y) float32 -3.9e+04 -4e+04 ... -6.21e+05 -6.22e+05\n", "Dimensions without coordinates: nv\n", "Data variables:\n", " dayl (time, y, x) float32 dask.array\n", " lambert_conformal_conic int16 ...\n", " prcp (time, y, x) float32 dask.array\n", " srad (time, y, x) float32 dask.array\n", " swe (time, y, x) float32 dask.array\n", " time_bnds (time, nv) datetime64[ns] dask.array\n", " tmax (time, y, x) float32 dask.array\n", " tmin (time, y, x) float32 dask.array\n", " vp (time, y, x) float32 dask.array\n", " yearday (time) int16 dask.array\n", "Attributes:\n", " Conventions: CF-1.6\n", " Version_data: Daymet Data Version 4.0\n", " Version_software: Daymet Software Version 4.0\n", " citation: Please see http://daymet.ornl.gov/ for current Daymet ...\n", " references: Please see http://daymet.ornl.gov/ for current informa...\n", " source: Daymet Software Version 4.0\n", " start_year: 1980" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import pystac_client\n", "import planetary_computer\n", "import xarray as xr\n", "\n", "account_name = \"daymeteuwest\"\n", "container_name = \"daymet-zarr\"\n", "\n", "catalog = pystac_client.Client.open(\n", " \"https://planetarycomputer.microsoft.com/api/stac/v1\",\n", " modifier=planetary_computer.sign_inplace,\n", ")\n", "asset = catalog.get_collection(\"daymet-daily-hi\").assets[\"zarr-abfs\"]\n", "\n", "ds = xr.open_zarr(\n", " asset.href,\n", " **asset.extra_fields[\"xarray:open_kwargs\"],\n", " storage_options=asset.extra_fields[\"xarray:storage_options\"]\n", ")\n", "ds" ] }, { "cell_type": "markdown", "id": "generous-controversy", "metadata": {}, "source": [ "The `.compute()` call in the next cell is what triggers computation and causes Dask to scale the cluster up to a dozen or so workers." ] }, { "cell_type": "code", "execution_count": 5, "id": "atmospheric-liability", "metadata": {}, "outputs": [ { "data": { "text/html": [ "" ], "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "timeseries = ds[\"tmin\"].mean(dim=[\"x\", \"y\"]).compute()\n", "\n", "import matplotlib.pyplot as plt\n", "\n", "fig, ax = plt.subplots(figsize=(12, 6))\n", "timeseries.plot(ax=ax);" ] }, { "cell_type": "code", "execution_count": 6, "id": "advisory-venice", "metadata": {}, "outputs": [], "source": [ "cluster.close()" ] }, { "cell_type": "markdown", "id": "literary-advancement", "metadata": {}, "source": [ "### Customize your cluster\n", "\n", "`dask_gateway.GatewayCluster` creates a cluster with some default settings, which might not be appropriate for your workload. For example, we might have a memory-intensive workload which requires more memory per CPU core. Or we might need to set environment variables on the workers.\n", "\n", "To customize your cluster, create a `Gateway` object and then customize the options." ] }, { "cell_type": "code", "execution_count": 7, "id": "usual-wings", "metadata": {}, "outputs": [], "source": [ "import dask_gateway\n", "\n", "gateway = dask_gateway.Gateway()\n", "cluster_options = gateway.cluster_options()" ] }, { "cell_type": "markdown", "id": "postal-audience", "metadata": {}, "source": [ "\"Cluster\n", "\n", "In a Jupyter Notebook, you can use the HTML widget to customize the options. Or using Python you can adjust the values programmatically. We'll ask for 16GiB of memory per worker." ] }, { "cell_type": "code", "execution_count": 8, "id": "beginning-dragon", "metadata": {}, "outputs": [], "source": [ "cluster_options[\"worker_memory\"] = 16" ] }, { "cell_type": "markdown", "id": "collected-accuracy", "metadata": {}, "source": [ "Now create your cluster. **Make sure to pass the `cluster_options` object to `gateway.new_cluster`**." ] }, { "cell_type": "code", "execution_count": 9, "id": "geological-hierarchy", "metadata": {}, "outputs": [], "source": [ "cluster = gateway.new_cluster(cluster_options)\n", "client = cluster.get_client()\n", "cluster.scale(2)" ] }, { "cell_type": "markdown", "id": "fossil-indianapolis", "metadata": {}, "source": [ "### Learn more\n", "\n", "The [Dask documentation](https://docs.dask.org/en/latest/) has much more information on using Dask for scalable computing. This JupyterHub deployment uses [Dask Gateway](https://gateway.dask.org/) to manage creating Dask clusters." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.10" }, "widgets": { "application/vnd.jupyter.widget-state+json": { "state": {}, "version_major": 2, "version_minor": 0 } } }, "nbformat": 4, "nbformat_minor": 5 }