{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "*This notebook contains material from [PyRosetta](https://RosettaCommons.github.io/PyRosetta.notebooks);\n", "content is available [on Github](https://github.com/RosettaCommons/PyRosetta.notebooks.git).*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "< [Example of Using PyRosetta with GNU Parallel](http://nbviewer.jupyter.org/github/RosettaCommons/PyRosetta.notebooks/blob/master/notebooks/16.03-GNU-Parallel-Via-Slurm.ipynb) | [Contents](toc.ipynb) | [Index](index.ipynb) | [Part I: Parallelized Global Ligand Docking with `pyrosetta.distributed`](http://nbviewer.jupyter.org/github/RosettaCommons/PyRosetta.notebooks/blob/master/notebooks/16.05-Ligand-Docking-dask.ipynb) >

\"Open" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Examples Using the `dask` Module" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### We can make use of the `dask` library to parallelize code" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "*Note:* This Jupyter notebook uses parallelization and is **not** meant to be executed within a Google Colab environment." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "*Note:* This Jupyter notebook requires the PyRosetta distributed layer which is obtained by building PyRosetta with the `--serialization` flag or installing PyRosetta from the RosettaCommons conda channel \n", "\n", "**Please see Chapter 16.00 for setup instructions**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import dask\n", "import dask.array as da\n", "import graphviz\n", "import logging\n", "logging.basicConfig(level=logging.INFO)\n", "import numpy as np\n", "import os\n", "import pyrosetta\n", "import pyrosetta.distributed\n", "import pyrosetta.distributed.dask\n", "import pyrosetta.distributed.io as io\n", "import random\n", "import sys\n", "\n", "from dask.distributed import Client, LocalCluster, progress\n", "from dask_jobqueue import SLURMCluster\n", "from IPython.display import Image\n", "\n", "if 'google.colab' in sys.modules:\n", " print(\"This Jupyter notebook uses parallelization and is therefore not set up for the Google Colab environment.\")\n", " sys.exit(0)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Initialize PyRosetta within this Jupyter notebook using custom command line PyRosetta flags:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "flags = \"\"\"-out:level 100\n", "-ignore_unrecognized_res 1\n", " -ignore_waters 0 \n", " -detect_disulf 0 # Do not automatically detect disulfides\n", "\"\"\" # These can be unformatted for user convenience, but no spaces in file paths!\n", "pyrosetta.distributed.init(flags)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you are running this example on a high-performance computing (HPC) cluster with SLURM scheduling, use the `SLURMCluster` class described below. For more information, visit https://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SLURMCluster.html. **Note**: If you are running this example on a HPC cluster with a job scheduler other than SLURM, `dask_jobqueue` also works with other job schedulers: http://jobqueue.dask.org/en/latest/api.html\n", "\n", "The `SLURMCluster` class in the `dask_jobqueue` module is very useful! In this case, we are requesting four workers using `cluster.scale(4)`, and specifying each worker to have:\n", "- one thread per worker with `cores=1`\n", "- one process per worker with `processes=1`\n", "- one CPU per task per worker with `job_cpu=1`\n", "- a total of 4GB memory per worker with `memory=\"4GB\"`\n", "- itself run on the \"short\" queue/partition on the SLURM scheduler with `queue=\"short\"`\n", "- a maximum job walltime of 3 hours using `walltime=\"03:00:00\"`\n", "- output dask files directed to `local_directory`\n", "- output SLURM log files directed to file path and file name (and any other SLURM commands) with the `job_extra` option\n", "- pre-initialization with the same custom command line PyRosetta flags used in this Jupyter notebook, using the `extra=pyrosetta.distributed.dask.worker_extra(init_flags=flags)` option\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if not os.getenv(\"DEBUG\"):\n", " scratch_dir = os.path.join(\"/net/scratch\", os.environ[\"USER\"])\n", " cluster = SLURMCluster(\n", " cores=1,\n", " processes=1,\n", " job_cpu=1,\n", " memory=\"4GB\",\n", " queue=\"short\",\n", " walltime=\"02:59:00\",\n", " local_directory=scratch_dir,\n", " job_extra=[\"-o {}\".format(os.path.join(scratch_dir, \"slurm-%j.out\"))],\n", " extra=pyrosetta.distributed.dask.worker_extra(init_flags=flags)\n", " )\n", " cluster.scale(4)\n", " client = Client(cluster)\n", "else:\n", " cluster = None\n", " client = None" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Note**: The actual sbatch script submitted to the Slurm scheduler under the hood was:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if not os.getenv(\"DEBUG\"):\n", " print(cluster.job_script())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Otherwise, if you are running this example locally on your laptop, you can still spawn workers and take advantage of the `dask` module:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# cluster = LocalCluster(n_workers=1, threads_per_worker=1)\n", "# client = Client(cluster)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Open the `dask` dashboard, which shows diagnostic information about the current state of your cluster and helps track progress, identify performance issues, and debug failures:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Consider the following example that runs within this Jupyter notebook kernel just fine but could be parallelized:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def inc(x):\n", " return x + 1\n", "\n", "def double(x):\n", " return x + 2\n", "\n", "def add(x, y):\n", " return x + y" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "output = []\n", "for x in range(10):\n", " a = inc(x)\n", " b = double(x)\n", " c = add(a, b)\n", " output.append(c)\n", "\n", "total = sum(output)\n", "print(total)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With a slight modification, we can parallelize it on the HPC cluster using the `dask` module" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "output = []\n", "for x in range(10):\n", " a = dask.delayed(inc)(x)\n", " b = dask.delayed(double)(x)\n", " c = dask.delayed(add)(a, b)\n", " output.append(c)\n", "\n", "delayed = dask.delayed(sum)(output)\n", "print(delayed)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We used the `dask.delayed` function to wrap the function calls that we want to turn into tasks. None of the `inc`, `double`, `add`, or `sum` calls have happened yet. Instead, the object total is a `Delayed` object that contains a task graph of the entire computation to be executed.\n", "\n", "Let's visualize the task graph to see clear opportunities for parallel execution." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if not os.getenv(\"DEBUG\"):\n", " delayed.visualize()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can now compute this lazy result to execute the graph in parallel:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if not os.getenv(\"DEBUG\"):\n", " total = delayed.compute()\n", " print(total)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can also use `dask.delayed` as a python function decorator for identical performance" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@dask.delayed\n", "def inc(x):\n", " return x + 1\n", "\n", "@dask.delayed\n", "def double(x):\n", " return x + 2\n", "\n", "@dask.delayed\n", "def add(x, y):\n", " return x + y\n", "\n", "output = []\n", "for x in range(10):\n", " a = inc(x)\n", " b = double(x)\n", " c = add(a, b)\n", " output.append(c)\n", "\n", "total = dask.delayed(sum)(output).compute()\n", "print(total)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can also use the `dask.array` library, which implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many parallelizable small arrays.\n", "\n", "See `dask.array` documentation: http://docs.dask.org/en/latest/array.html, along with that of `dask.bag`, `dask.dataframe`, `dask.delayed`, `Futures`, etc." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if not os.getenv(\"DEBUG\"):\n", " x = da.random.random((10000, 10000, 10), chunks=(1000, 1000, 5))\n", " y = da.random.random((10000, 10000, 10), chunks=(1000, 1000, 5))\n", " z = (da.arcsin(x) + da.arccos(y)).sum(axis=(1, 2))\n", " z.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The dask dashboard allows visualizing parallel computation, including progress bars for tasks. Here is a snapshot of the dask dashboard while executing the previous cell:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Image(filename=\"inputs/dask_dashboard_example.png\") " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For more info on interpreting the dask dashboard, see: https://distributed.dask.org/en/latest/web.html" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Example Using `dask.delayed` with PyRosetta" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's look at a simple example of sending PyRosetta jobs to the `dask-worker`, and the `dask-worker` sending the results back to this Jupyter Notebook.\n", "\n", "We will use the crystal structure of the *de novo* mini protein gEHEE_06 from PDB ID 5JG9" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@dask.delayed\n", "def mutate(ppose, target, new_res):\n", " import pyrosetta\n", " pose = io.to_pose(ppose)\n", " mutate = pyrosetta.rosetta.protocols.simple_moves.MutateResidue(target=target, new_res=new_res)\n", " mutate.apply(pose)\n", " return io.to_packed(pose)\n", "\n", "@dask.delayed\n", "def refine(ppose):\n", " import pyrosetta\n", " pose = io.to_pose(ppose)\n", " scorefxn = pyrosetta.create_score_function(\"ref2015_cart\")\n", " mm = pyrosetta.rosetta.core.kinematics.MoveMap()\n", " mm.set_bb(True)\n", " mm.set_chi(True)\n", " min_mover = pyrosetta.rosetta.protocols.minimization_packing.MinMover()\n", " min_mover.set_movemap(mm)\n", " min_mover.score_function(scorefxn)\n", " min_mover.min_type(\"lbfgs_armijo_nonmonotone\")\n", " min_mover.cartesian(True)\n", " min_mover.tolerance(0.01)\n", " min_mover.max_iter(200)\n", " min_mover.apply(pose)\n", " return io.to_packed(pose)\n", "\n", "@dask.delayed\n", "def score(ppose):\n", " import pyrosetta\n", " pose = io.to_pose(ppose)\n", " scorefxn = pyrosetta.create_score_function(\"ref2015\")\n", " total_score = scorefxn(pose)\n", " return pose, total_score\n", "\n", "\n", "if not os.getenv(\"DEBUG\"):\n", " pose = pyrosetta.io.pose_from_file(\"inputs/5JG9.clean.pdb\")\n", " keep_chA = pyrosetta.rosetta.protocols.grafting.simple_movers.KeepRegionMover(\n", " res_start=str(pose.chain_begin(1)), res_end=str(pose.chain_end(1))\n", " )\n", " keep_chA.apply(pose)\n", "\n", " #kwargs = {\"extra_options\": pyrosetta.distributed._normflags(flags)}\n", "\n", " output = []\n", " for target in random.sample(range(1, pose.size() + 1), 10):\n", " if pose.sequence()[target - 1] != \"C\":\n", " for new_res in [\"ALA\", \"TRP\"]:\n", " a = mutate(io.to_packed(pose), target, new_res)\n", " b = refine(a)\n", " c = score(b)\n", " output.append((target, new_res, c[0], c[1]))\n", "\n", " delayed_obj = dask.delayed(np.argmin)([x[-1] for x in output])\n", " delayed_obj.visualize()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(output)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if not os.getenv(\"DEBUG\"):\n", " delayed_result = delayed_obj.persist()\n", " progress(delayed_result)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The dask progress bar allows visualizing parallelization directly within the Jupyter notebook. Here is a snapshot of the dask progress bar while executing the previous cell:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Image(filename=\"inputs/dask_progress_bar_example.png\") " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if not os.getenv(\"DEBUG\"):\n", " result = delayed_result.compute()\n", " print(\"The mutation with the lowest energy is residue {0} at position {1}\".format(output[result][1], output[result][0]))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "*Note*: For best practices while using `dask.delayed`, see: http://docs.dask.org/en/latest/delayed-best-practices.html" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "< [Example of Using PyRosetta with GNU Parallel](http://nbviewer.jupyter.org/github/RosettaCommons/PyRosetta.notebooks/blob/master/notebooks/16.03-GNU-Parallel-Via-Slurm.ipynb) | [Contents](toc.ipynb) | [Index](index.ipynb) | [Part I: Parallelized Global Ligand Docking with `pyrosetta.distributed`](http://nbviewer.jupyter.org/github/RosettaCommons/PyRosetta.notebooks/blob/master/notebooks/16.05-Ligand-Docking-dask.ipynb) >

\"Open" ] } ], "metadata": { "kernelspec": { "display_name": "Python [conda env:PyRosetta.notebooks]", "language": "python", "name": "conda-env-PyRosetta.notebooks-py" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" }, "toc": { "base_numbering": 1, "nav_menu": {}, "number_sections": true, "sideBar": true, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": false, "toc_position": {}, "toc_section_display": true, "toc_window_display": false } }, "nbformat": 4, "nbformat_minor": 2 }