{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "*This notebook contains material from [PyRosetta](https://RosettaCommons.github.io/PyRosetta.notebooks);\n", "content is available [on Github](https://github.com/RosettaCommons/PyRosetta.notebooks.git).*" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "< [Example of Using PyRosetta with GNU Parallel](http://nbviewer.jupyter.org/github/RosettaCommons/PyRosetta.notebooks/blob/master/notebooks/16.03-GNU-Parallel-Via-Slurm.ipynb) | [Contents](toc.ipynb) | [Index](index.ipynb) | [Part I: Parallelized Global Ligand Docking with `pyrosetta.distributed`](http://nbviewer.jupyter.org/github/RosettaCommons/PyRosetta.notebooks/blob/master/notebooks/16.05-Ligand-Docking-dask.ipynb) >
"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Examples Using the `dask` Module"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### We can make use of the `dask` library to parallelize code"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"*Note:* This Jupyter notebook uses parallelization and is **not** meant to be executed within a Google Colab environment."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"*Note:* This Jupyter notebook requires the PyRosetta distributed layer which is obtained by building PyRosetta with the `--serialization` flag or installing PyRosetta from the RosettaCommons conda channel \n",
"\n",
"**Please see Chapter 16.00 for setup instructions**"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import dask\n",
"import dask.array as da\n",
"import graphviz\n",
"import logging\n",
"logging.basicConfig(level=logging.INFO)\n",
"import numpy as np\n",
"import os\n",
"import pyrosetta\n",
"import pyrosetta.distributed\n",
"import pyrosetta.distributed.dask\n",
"import pyrosetta.distributed.io as io\n",
"import random\n",
"import sys\n",
"\n",
"from dask.distributed import Client, LocalCluster, progress\n",
"from dask_jobqueue import SLURMCluster\n",
"from IPython.display import Image\n",
"\n",
"if 'google.colab' in sys.modules:\n",
" print(\"This Jupyter notebook uses parallelization and is therefore not set up for the Google Colab environment.\")\n",
" sys.exit(0)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Initialize PyRosetta within this Jupyter notebook using custom command line PyRosetta flags:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"flags = \"\"\"-out:level 100\n",
"-ignore_unrecognized_res 1\n",
" -ignore_waters 0 \n",
" -detect_disulf 0 # Do not automatically detect disulfides\n",
"\"\"\" # These can be unformatted for user convenience, but no spaces in file paths!\n",
"pyrosetta.distributed.init(flags)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"If you are running this example on a high-performance computing (HPC) cluster with SLURM scheduling, use the `SLURMCluster` class described below. For more information, visit https://jobqueue.dask.org/en/latest/generated/dask_jobqueue.SLURMCluster.html. **Note**: If you are running this example on a HPC cluster with a job scheduler other than SLURM, `dask_jobqueue` also works with other job schedulers: http://jobqueue.dask.org/en/latest/api.html\n",
"\n",
"The `SLURMCluster` class in the `dask_jobqueue` module is very useful! In this case, we are requesting four workers using `cluster.scale(4)`, and specifying each worker to have:\n",
"- one thread per worker with `cores=1`\n",
"- one process per worker with `processes=1`\n",
"- one CPU per task per worker with `job_cpu=1`\n",
"- a total of 4GB memory per worker with `memory=\"4GB\"`\n",
"- itself run on the \"short\" queue/partition on the SLURM scheduler with `queue=\"short\"`\n",
"- a maximum job walltime of 3 hours using `walltime=\"03:00:00\"`\n",
"- output dask files directed to `local_directory`\n",
"- output SLURM log files directed to file path and file name (and any other SLURM commands) with the `job_extra` option\n",
"- pre-initialization with the same custom command line PyRosetta flags used in this Jupyter notebook, using the `extra=pyrosetta.distributed.dask.worker_extra(init_flags=flags)` option\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"if not os.getenv(\"DEBUG\"):\n",
" scratch_dir = os.path.join(\"/net/scratch\", os.environ[\"USER\"])\n",
" cluster = SLURMCluster(\n",
" cores=1,\n",
" processes=1,\n",
" job_cpu=1,\n",
" memory=\"4GB\",\n",
" queue=\"short\",\n",
" walltime=\"02:59:00\",\n",
" local_directory=scratch_dir,\n",
" job_extra=[\"-o {}\".format(os.path.join(scratch_dir, \"slurm-%j.out\"))],\n",
" extra=pyrosetta.distributed.dask.worker_extra(init_flags=flags)\n",
" )\n",
" cluster.scale(4)\n",
" client = Client(cluster)\n",
"else:\n",
" cluster = None\n",
" client = None"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Note**: The actual sbatch script submitted to the Slurm scheduler under the hood was:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"if not os.getenv(\"DEBUG\"):\n",
" print(cluster.job_script())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Otherwise, if you are running this example locally on your laptop, you can still spawn workers and take advantage of the `dask` module:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# cluster = LocalCluster(n_workers=1, threads_per_worker=1)\n",
"# client = Client(cluster)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Open the `dask` dashboard, which shows diagnostic information about the current state of your cluster and helps track progress, identify performance issues, and debug failures:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Consider the following example that runs within this Jupyter notebook kernel just fine but could be parallelized:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def inc(x):\n",
" return x + 1\n",
"\n",
"def double(x):\n",
" return x + 2\n",
"\n",
"def add(x, y):\n",
" return x + y"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"output = []\n",
"for x in range(10):\n",
" a = inc(x)\n",
" b = double(x)\n",
" c = add(a, b)\n",
" output.append(c)\n",
"\n",
"total = sum(output)\n",
"print(total)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"With a slight modification, we can parallelize it on the HPC cluster using the `dask` module"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"output = []\n",
"for x in range(10):\n",
" a = dask.delayed(inc)(x)\n",
" b = dask.delayed(double)(x)\n",
" c = dask.delayed(add)(a, b)\n",
" output.append(c)\n",
"\n",
"delayed = dask.delayed(sum)(output)\n",
"print(delayed)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We used the `dask.delayed` function to wrap the function calls that we want to turn into tasks. None of the `inc`, `double`, `add`, or `sum` calls have happened yet. Instead, the object total is a `Delayed` object that contains a task graph of the entire computation to be executed.\n",
"\n",
"Let's visualize the task graph to see clear opportunities for parallel execution."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"if not os.getenv(\"DEBUG\"):\n",
" delayed.visualize()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can now compute this lazy result to execute the graph in parallel:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"if not os.getenv(\"DEBUG\"):\n",
" total = delayed.compute()\n",
" print(total)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can also use `dask.delayed` as a python function decorator for identical performance"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"@dask.delayed\n",
"def inc(x):\n",
" return x + 1\n",
"\n",
"@dask.delayed\n",
"def double(x):\n",
" return x + 2\n",
"\n",
"@dask.delayed\n",
"def add(x, y):\n",
" return x + y\n",
"\n",
"output = []\n",
"for x in range(10):\n",
" a = inc(x)\n",
" b = double(x)\n",
" c = add(a, b)\n",
" output.append(c)\n",
"\n",
"total = dask.delayed(sum)(output).compute()\n",
"print(total)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can also use the `dask.array` library, which implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many parallelizable small arrays.\n",
"\n",
"See `dask.array` documentation: http://docs.dask.org/en/latest/array.html, along with that of `dask.bag`, `dask.dataframe`, `dask.delayed`, `Futures`, etc."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"if not os.getenv(\"DEBUG\"):\n",
" x = da.random.random((10000, 10000, 10), chunks=(1000, 1000, 5))\n",
" y = da.random.random((10000, 10000, 10), chunks=(1000, 1000, 5))\n",
" z = (da.arcsin(x) + da.arccos(y)).sum(axis=(1, 2))\n",
" z.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The dask dashboard allows visualizing parallel computation, including progress bars for tasks. Here is a snapshot of the dask dashboard while executing the previous cell:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"Image(filename=\"inputs/dask_dashboard_example.png\") "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For more info on interpreting the dask dashboard, see: https://distributed.dask.org/en/latest/web.html"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Example Using `dask.delayed` with PyRosetta"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's look at a simple example of sending PyRosetta jobs to the `dask-worker`, and the `dask-worker` sending the results back to this Jupyter Notebook.\n",
"\n",
"We will use the crystal structure of the *de novo* mini protein gEHEE_06 from PDB ID 5JG9"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"@dask.delayed\n",
"def mutate(ppose, target, new_res):\n",
" import pyrosetta\n",
" pose = io.to_pose(ppose)\n",
" mutate = pyrosetta.rosetta.protocols.simple_moves.MutateResidue(target=target, new_res=new_res)\n",
" mutate.apply(pose)\n",
" return io.to_packed(pose)\n",
"\n",
"@dask.delayed\n",
"def refine(ppose):\n",
" import pyrosetta\n",
" pose = io.to_pose(ppose)\n",
" scorefxn = pyrosetta.create_score_function(\"ref2015_cart\")\n",
" mm = pyrosetta.rosetta.core.kinematics.MoveMap()\n",
" mm.set_bb(True)\n",
" mm.set_chi(True)\n",
" min_mover = pyrosetta.rosetta.protocols.minimization_packing.MinMover()\n",
" min_mover.set_movemap(mm)\n",
" min_mover.score_function(scorefxn)\n",
" min_mover.min_type(\"lbfgs_armijo_nonmonotone\")\n",
" min_mover.cartesian(True)\n",
" min_mover.tolerance(0.01)\n",
" min_mover.max_iter(200)\n",
" min_mover.apply(pose)\n",
" return io.to_packed(pose)\n",
"\n",
"@dask.delayed\n",
"def score(ppose):\n",
" import pyrosetta\n",
" pose = io.to_pose(ppose)\n",
" scorefxn = pyrosetta.create_score_function(\"ref2015\")\n",
" total_score = scorefxn(pose)\n",
" return pose, total_score\n",
"\n",
"\n",
"if not os.getenv(\"DEBUG\"):\n",
" pose = pyrosetta.io.pose_from_file(\"inputs/5JG9.clean.pdb\")\n",
" keep_chA = pyrosetta.rosetta.protocols.grafting.simple_movers.KeepRegionMover(\n",
" res_start=str(pose.chain_begin(1)), res_end=str(pose.chain_end(1))\n",
" )\n",
" keep_chA.apply(pose)\n",
"\n",
" #kwargs = {\"extra_options\": pyrosetta.distributed._normflags(flags)}\n",
"\n",
" output = []\n",
" for target in random.sample(range(1, pose.size() + 1), 10):\n",
" if pose.sequence()[target - 1] != \"C\":\n",
" for new_res in [\"ALA\", \"TRP\"]:\n",
" a = mutate(io.to_packed(pose), target, new_res)\n",
" b = refine(a)\n",
" c = score(b)\n",
" output.append((target, new_res, c[0], c[1]))\n",
"\n",
" delayed_obj = dask.delayed(np.argmin)([x[-1] for x in output])\n",
" delayed_obj.visualize()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(output)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"if not os.getenv(\"DEBUG\"):\n",
" delayed_result = delayed_obj.persist()\n",
" progress(delayed_result)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The dask progress bar allows visualizing parallelization directly within the Jupyter notebook. Here is a snapshot of the dask progress bar while executing the previous cell:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"Image(filename=\"inputs/dask_progress_bar_example.png\") "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"if not os.getenv(\"DEBUG\"):\n",
" result = delayed_result.compute()\n",
" print(\"The mutation with the lowest energy is residue {0} at position {1}\".format(output[result][1], output[result][0]))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"*Note*: For best practices while using `dask.delayed`, see: http://docs.dask.org/en/latest/delayed-best-practices.html"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"< [Example of Using PyRosetta with GNU Parallel](http://nbviewer.jupyter.org/github/RosettaCommons/PyRosetta.notebooks/blob/master/notebooks/16.03-GNU-Parallel-Via-Slurm.ipynb) | [Contents](toc.ipynb) | [Index](index.ipynb) | [Part I: Parallelized Global Ligand Docking with `pyrosetta.distributed`](http://nbviewer.jupyter.org/github/RosettaCommons/PyRosetta.notebooks/blob/master/notebooks/16.05-Ligand-Docking-dask.ipynb) >
"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python [conda env:PyRosetta.notebooks]",
"language": "python",
"name": "conda-env-PyRosetta.notebooks-py"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": false,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": false
}
},
"nbformat": 4,
"nbformat_minor": 2
}