{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Dask Extension" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## If you have problems with this tutorial, try to download the Notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!wget https://jupyter-jsc.fz-juelich.de/static/files/Dask_JURON.ipynb" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This notebook will give you a short introduction into the Dask Extension on JURON. It allows you to run Jobs on the compute nodes, even if your JupyterLab is running interactively on the login node. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Monte-Carlo Estimate of $\\pi$\n", "\n", "We want to estimate the number $\\pi$ using a [Monte-Carlo method](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods) exploiting that the area of a quarter circle of unit radius is $\\pi/4$ and that hence the probability of any randomly chosen point in a unit square to lie in a unit circle centerd at a corner of the unit square is $\\pi/4$ as well. So for N randomly chosen pairs $(x, y)$ with $x\\in[0, 1)$ and $y\\in[0, 1)$, we count the number $N_{circ}$ of pairs that also satisfy $(x^2 + y^2) < 1$ and estimage $\\pi \\approx 4 \\cdot N_{circ} / N$.\n", "\n", "[\"PI](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Core Lessons\n", "\n", "- setting up SLURM (and other jobqueue) clusters\n", "- Scaling clusters\n", "- Adaptive clusters" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set up a Slurm cluster\n", "\n", "We'll create a SLURM cluster and have a look at the job-script used to start workers on the HPC scheduler." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import dask\n", "from dask.distributed import Client\n", "from dask_jobqueue import LSFCluster\n", "import os\n", "\n", "dask.config.set({\"jobqueue.lsf.use-stdin\": True})\n", "cluster = LSFCluster(\n", " queue=\"normal\",\n", " walltime=\"60\",\n", " ncpus=2,\n", " host=\"192.168.45.25\",\n", " scheduler_options={\"dashboard_address\": \"0.0.0.0:56755\"},\n", " death_timeout=\"15s\",\n", " mem=4 * 1024 * 1024 * 1024,\n", " log_directory=\"{}/dask_jobqueue_logs\".format(os.getenv(\"HOME\")),\n", " cores=4,\n", " locals_directory=\"/tmp\",\n", " n_workers=4,\n", " memory=\"128GB\",\n", " usestd_in=True\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(cluster.job_script())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client = Client(cluster)\n", "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## You can visit the Dask Dashboard at the following url: \n", "```\n", "https://jupyter-jsc.fz-juelich.de/user///proxy//status\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## You can integrate it into your JupyterLab environment by putting the link into the Dask Extension" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![\"Dask\"](https://zam10183.zam.kfa-juelich.de/hub/static/images/dask2.png \"dask\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Afterwards you can press on the orange buttons to open a new tab in your JupyterLab Environment." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Scale the cluster to two nodes\n", "\n", "A look at the Dashboard reveals that there are no workers in the clusetr. Let's start 4 workers (in 2 SLURM jobs).\n", "\n", "For the distiction between _workers_ and _jobs_, see [the Dask jobqueue docs](https://jobqueue.dask.org/en/latest/howitworks.html#workers-vs-jobs)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cluster.scale(4) # scale to 4 _workers_" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## The Monte Carlo Method" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import dask.array as da\n", "import numpy as np\n", "\n", "\n", "def calc_pi_mc(size_in_bytes, chunksize_in_bytes=200e6):\n", " \"\"\"Calculate PI using a Monte Carlo estimate.\"\"\"\n", "\n", " size = int(size_in_bytes / 8)\n", " chunksize = int(chunksize_in_bytes / 8)\n", "\n", " xy = da.random.uniform(0, 1, size=(size / 2, 2), chunks=(chunksize / 2, 2))\n", "\n", " in_circle = (xy ** 2).sum(axis=-1) < 1\n", " pi = 4 * in_circle.mean()\n", "\n", " return pi\n", "\n", "\n", "def print_pi_stats(size, pi, time_delta, num_workers):\n", " \"\"\"Print pi, calculate offset from true value, and print some stats.\"\"\"\n", " print(\n", " f\"{size / 1e9} GB\\n\"\n", " f\"\\tMC pi: {pi : 13.11f}\"\n", " f\"\\tErr: {abs(pi - np.pi) : 10.3e}\\n\"\n", " f\"\\tWorkers: {num_workers}\"\n", " f\"\\t\\tTime: {time_delta : 7.3f}s\"\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## The actual calculations\n", "\n", "We loop over different volumes of double-precision random numbers and estimate $\\pi$ as described above." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from time import time, sleep" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for size in (1e9 * n for n in (1, 10, 100)):\n", "\n", " start = time()\n", " pi = calc_pi_mc(size).compute()\n", " elaps = time() - start\n", "\n", " print_pi_stats(\n", " size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Scaling the Cluster to twice its size\n", "\n", "We increase the number of workers by 2 and the re-run the experiments." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "new_num_workers = 2 * len(cluster.scheduler.workers)\n", "\n", "print(f\"Scaling from {len(cluster.scheduler.workers)} to {new_num_workers} workers.\")\n", "\n", "cluster.scale(new_num_workers)\n", "\n", "sleep(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Re-run same experiments with doubled cluster" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for size in (1e9 * n for n in (1, 10, 100)):\n", "\n", " start = time()\n", " pi = calc_pi_mc(size).compute()\n", " elaps = time() - start\n", "\n", " print_pi_stats(\n", " size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Automatically Scaling the Cluster\n", "\n", "We want each calculation to take only a few seconds. Dask will try to add more workers to the cluster when workloads are high and remove workers when idling.\n", "\n", "_**Watch** how the cluster will scale down to the minimum a few seconds after being made adaptive._" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ca = cluster.adapt(minimum=4, maximum=100)\n", "\n", "sleep(4) # Allow for scale-down" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Repeat the calculation from above with larger work loads\n", "\n", "(And watch the dash board!)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for size in (n * 1e9 for n in (1, 10, 100)):\n", "\n", " start = time()\n", " pi = calc_pi_mc(size, min(size / 1000, 500e6)).compute()\n", " elaps = time() - start\n", "\n", " print_pi_stats(\n", " size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)\n", " )\n", "\n", " sleep(20) # allow for scale-down time" ] } ], "metadata": { "anaconda-cloud": {}, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 4 }