{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dask Extension"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## If you have problems with this tutorial, try to download the Notebook."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"!wget https://jupyter-jsc.fz-juelich.de/static/files/Dask_JURON.ipynb"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This notebook will give you a short introduction into the Dask Extension on JURON. It allows you to run Jobs on the compute nodes, even if your JupyterLab is running interactively on the login node. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Monte-Carlo Estimate of $\\pi$\n",
"\n",
"We want to estimate the number $\\pi$ using a [Monte-Carlo method](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods) exploiting that the area of a quarter circle of unit radius is $\\pi/4$ and that hence the probability of any randomly chosen point in a unit square to lie in a unit circle centerd at a corner of the unit square is $\\pi/4$ as well. So for N randomly chosen pairs $(x, y)$ with $x\\in[0, 1)$ and $y\\in[0, 1)$, we count the number $N_{circ}$ of pairs that also satisfy $(x^2 + y^2) < 1$ and estimage $\\pi \\approx 4 \\cdot N_{circ} / N$.\n",
"\n",
"[
](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Core Lessons\n",
"\n",
"- setting up SLURM (and other jobqueue) clusters\n",
"- Scaling clusters\n",
"- Adaptive clusters"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set up a Slurm cluster\n",
"\n",
"We'll create a SLURM cluster and have a look at the job-script used to start workers on the HPC scheduler."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask\n",
"from dask.distributed import Client\n",
"from dask_jobqueue import LSFCluster\n",
"import os\n",
"\n",
"dask.config.set({\"jobqueue.lsf.use-stdin\": True})\n",
"cluster = LSFCluster(\n",
" queue=\"normal\",\n",
" walltime=\"60\",\n",
" ncpus=2,\n",
" host=\"192.168.45.25\",\n",
" scheduler_options={\"dashboard_address\": \"0.0.0.0:56755\"},\n",
" death_timeout=\"15s\",\n",
" mem=4 * 1024 * 1024 * 1024,\n",
" log_directory=\"{}/dask_jobqueue_logs\".format(os.getenv(\"HOME\")),\n",
" cores=4,\n",
" locals_directory=\"/tmp\",\n",
" n_workers=4,\n",
" memory=\"128GB\",\n",
" usestd_in=True\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(cluster.job_script())"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client = Client(cluster)\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## You can visit the Dask Dashboard at the following url: \n",
"```\n",
"https://jupyter-jsc.fz-juelich.de/user///proxy//status\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## You can integrate it into your JupyterLab environment by putting the link into the Dask Extension"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Afterwards you can press on the orange buttons to open a new tab in your JupyterLab Environment."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Scale the cluster to two nodes\n",
"\n",
"A look at the Dashboard reveals that there are no workers in the clusetr. Let's start 4 workers (in 2 SLURM jobs).\n",
"\n",
"For the distiction between _workers_ and _jobs_, see [the Dask jobqueue docs](https://jobqueue.dask.org/en/latest/howitworks.html#workers-vs-jobs)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cluster.scale(4) # scale to 4 _workers_"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## The Monte Carlo Method"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask.array as da\n",
"import numpy as np\n",
"\n",
"\n",
"def calc_pi_mc(size_in_bytes, chunksize_in_bytes=200e6):\n",
" \"\"\"Calculate PI using a Monte Carlo estimate.\"\"\"\n",
"\n",
" size = int(size_in_bytes / 8)\n",
" chunksize = int(chunksize_in_bytes / 8)\n",
"\n",
" xy = da.random.uniform(0, 1, size=(size / 2, 2), chunks=(chunksize / 2, 2))\n",
"\n",
" in_circle = (xy ** 2).sum(axis=-1) < 1\n",
" pi = 4 * in_circle.mean()\n",
"\n",
" return pi\n",
"\n",
"\n",
"def print_pi_stats(size, pi, time_delta, num_workers):\n",
" \"\"\"Print pi, calculate offset from true value, and print some stats.\"\"\"\n",
" print(\n",
" f\"{size / 1e9} GB\\n\"\n",
" f\"\\tMC pi: {pi : 13.11f}\"\n",
" f\"\\tErr: {abs(pi - np.pi) : 10.3e}\\n\"\n",
" f\"\\tWorkers: {num_workers}\"\n",
" f\"\\t\\tTime: {time_delta : 7.3f}s\"\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## The actual calculations\n",
"\n",
"We loop over different volumes of double-precision random numbers and estimate $\\pi$ as described above."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from time import time, sleep"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"for size in (1e9 * n for n in (1, 10, 100)):\n",
"\n",
" start = time()\n",
" pi = calc_pi_mc(size).compute()\n",
" elaps = time() - start\n",
"\n",
" print_pi_stats(\n",
" size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Scaling the Cluster to twice its size\n",
"\n",
"We increase the number of workers by 2 and the re-run the experiments."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"new_num_workers = 2 * len(cluster.scheduler.workers)\n",
"\n",
"print(f\"Scaling from {len(cluster.scheduler.workers)} to {new_num_workers} workers.\")\n",
"\n",
"cluster.scale(new_num_workers)\n",
"\n",
"sleep(10)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Re-run same experiments with doubled cluster"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"for size in (1e9 * n for n in (1, 10, 100)):\n",
"\n",
" start = time()\n",
" pi = calc_pi_mc(size).compute()\n",
" elaps = time() - start\n",
"\n",
" print_pi_stats(\n",
" size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Automatically Scaling the Cluster\n",
"\n",
"We want each calculation to take only a few seconds. Dask will try to add more workers to the cluster when workloads are high and remove workers when idling.\n",
"\n",
"_**Watch** how the cluster will scale down to the minimum a few seconds after being made adaptive._"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ca = cluster.adapt(minimum=4, maximum=100)\n",
"\n",
"sleep(4) # Allow for scale-down"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Repeat the calculation from above with larger work loads\n",
"\n",
"(And watch the dash board!)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"for size in (n * 1e9 for n in (1, 10, 100)):\n",
"\n",
" start = time()\n",
" pi = calc_pi_mc(size, min(size / 1000, 500e6)).compute()\n",
" elaps = time() - start\n",
"\n",
" print_pi_stats(\n",
" size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)\n",
" )\n",
"\n",
" sleep(20) # allow for scale-down time"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 4
}