{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# **Parsl: Workflow Tutorial**\n", "\n", "In this tutorial, you will be able to first try a few Parsl dataflows (examples 1-4) on your local machine, to get a sense of the library. Then, in examples 5-7 you will run similar dataflows on any resource you may have access to, such as clouds (Amazon Web Services), HPC systems, clusters etc, to see how more complex workflows can be expressed with Parsl." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Tutorial Section One\n", "\n", "This section will provide a walk-through for getting a simple \"mock\" science application running using Parsl on your local machine. \n", "|\n", "Note: the mock science apps are included in the apps directory of the tutorial repository." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example 1: Run a single application using Parsl\n", "\n", "The first Parsl script runs `simulate.sh` to generate a single random number. It writes the number to stdout. You can run the following script repeatedly to see different \"simulated\" results. \n", "\n", "![Part 1](figures/part01.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import parsl\n", "from parsl.app.app import python_app, bash_app\n", "from parsl.configs.local_threads import config\n", "\n", "parsl.load(config)\n", "\n", "@bash_app\n", "def mysim(stdout=\"output/p1.out\", stderr=\"output/p1.err\"):\n", " \"\"\"Set this example up as a bash app by returning the \n", " command line app to be called, in this case simulate\"\"\"\n", " return \"app/simulate\"\n", "\n", "# call the mysim app and wait for the result\n", "mysim().result()\n", "\n", "# open the output file and read the result\n", "with open('output/p1.out', 'r') as f:\n", " print(f.read())\n", "\n", "parsl.clear()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Debugging\n", "\n", "Debugging parallel dataflows is often more complicated than with serial programs. The easiest way to debug a Parsl application is via Python loggers. The following example shows how to enable logging. In this case we log the debug stream to the console. You can also log to a file using the `set_file_logger` function.\n", "\n", "When trying to debug individual apps it is often best to first capture stdout and stderr to files. These files will capture any text output by the app which may indicate app behavior. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import parsl\n", "from parsl.app.app import python_app, bash_app\n", "from parsl.configs.local_threads import config\n", "\n", "from parsl import set_stream_logger, NullHandler\n", "\n", "# set the stream logger to print debug messages\n", "set_stream_logger()\n", "\n", "parsl.load(config)\n", "\n", "@bash_app\n", "def mysim(stdout=\"output/p1-debug.out\", stderr=\"output/p1-debug.err\"):\n", " \"\"\"Set this example up as a bash app by returning the \n", " command line app to be called, in this case simulate\"\"\"\n", " return \"app/simulate\"\n", "\n", "mysim().result()\n", "\n", "with open('output/p1-debug.out', 'r') as f:\n", " print(f.read())\n", "\n", "parsl.clear()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example 2: Run a single Python function using Parsl\n", "\n", "The second example mirrors the first, however instead of using an external application it uses a Python function to simulate a science app again by generating a random number. In this case the Python app returns the simulated value as a Python object rather than using an external file." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "import parsl\n", "from parsl.app.app import python_app, bash_app\n", "from parsl.configs.local_threads import config\n", "\n", "parsl.load(config)\n", "\n", "@python_app\n", "def mysim():\n", " from random import randint\n", " \"\"\"Generate a random integer and return it\"\"\"\n", " return randint(1,100)\n", "\n", "print(mysim().result())\n", "\n", "parsl.clear()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example 3: Running an ensemble of many apps in parallel with a loop\n", "\n", "The third example shows how Parsl can be used to run many simulations in parallel. In this case, we define the same Parsl simulation app (`simulate`). The Python script then loops calling this app. Parsl ensures that each independent instance of the app will execute in parallel. Note: rather than use stdout the simulation app redirects the output to a specified file.\n", "\n", "![Part 3](figures/part03.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import parsl\n", "from parsl.app.app import python_app, bash_app\n", "from parsl.configs.local_threads import config\n", "\n", "parsl.load(config)\n", "\n", "@bash_app\n", "def mysim(outputs=[], stdout=\"output/p3.out\", stderr=\"output/p3.err\"):\n", " \"\"\"Call simulate and return results in the output file\"\"\"\n", " return 'app/simulate > {0}'.format(outputs[0])\n", "\n", "# loop to execute the simulation app 5 times\n", "results = []\n", "for i in range(5):\n", " out_file = \"output/p3_sim_{0}\".format(i)\n", " results.append(mysim(outputs=[out_file]))\n", "\n", "# print each job status, initially all are running\n", "print (\"Job Status: {}\".format([r.done() for r in results]))\n", "\n", "# wait for all apps to complete\n", "[r.result() for r in results]\n", "\n", "# print each job status, they will now be finished\n", "print (\"Job Status: {}\".format([r.done() for r in results]))\n", "\n", "# collect up the output files and print their values\n", "outputs = [r.outputs[0] for r in results]\n", "for o in outputs:\n", " with open(o.filename, 'r') as f:\n", " print(f.read().strip())\n", " \n", "parsl.clear()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example 4: Analyzing results of a parallel ensemble\n", "\n", "After all the parallel simulations in an ensemble run have completed, it is typically necessary to gather and analyze their results with some kind of post-processing analysis program or script. The fourth example introduces such a postprocessing step. In this case, the files created by all of the parallel runs of `simulate` will be averaged by the \"analysis application\" `stats`. \n", "\n", "Note: in this example we do not block on the outputs of the simulation app, rather the futures are passed directly to the analysis application. Parsl will manage these dependencies and only execute the analysis app when the simulation app completes.\n", "\n", "![Part 4](figures/part04.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import parsl\n", "from parsl.app.app import python_app, bash_app\n", "from parsl.configs.local_threads import config\n", "\n", "parsl.load(config)\n", "\n", "@bash_app\n", "def mysim(outputs=[],\n", " stdout=\"output/p4_sim.out\",\n", " stderr=\"output/p4_sim.err\"):\n", " \"\"\"Call simulate and return results in the output file\"\"\"\n", " return 'app/simulate > {0}'.format(outputs[0])\n", " \n", "\n", "@bash_app\n", "def stats(inputs=[],\n", " outputs=[],\n", " stderr='output/p4_stats.err',\n", " stdout='output/p4_stats.out'):\n", " \"\"\"call stats with all simulation results as inputs\"\"\"\n", " return \"app/stats {0} > {1}\".format(\" \".join(inputs), outputs[0])\n", "\n", "\n", "# call the simulation app 5 times\n", "results = []\n", "for i in range(5):\n", " out_file = \"output/p4_sim_{0}\".format(i)\n", " results.append(mysim(outputs=[out_file]))\n", "\n", "# collect the output data futures\n", "sim_outputs = [r.outputs[0].filepath for r in results]\n", "\n", "# run the stats app\n", "s = stats(inputs=sim_outputs, outputs=[\"output/p4_stats.txt\"])\n", "\n", "# wait for the result\n", "s.result()\n", "\n", "# print the result\n", "with open('output/p4_stats.txt', 'r') as f:\n", " print(f.read())\n", "\n", "parsl.clear()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Tutorial Section Two\n", "\n", "This section introduces execution of Parsl scripts on remote computational resources. \n", "\n", "Parsl supports a variety of resource providers as well as methods for submitting workload to those resources (e.g., pilot jobs). Example configurations for compute resources such as UChicago Midway, NERSC Cori, NCSA BlueWaters, and ANL Cooley are included in the config directory of the tutorial repository. \n", "\n", "A Parsl script can be executed from a login node by using these configuration files directly. Parsl also supports a more advanced submission model in which a SSH channel is used to submit wokloads from an external machine (e.g., your laptop). In this case the machine on which the script is executed must be accessible from the cluster (i.e., you will need to ensure that firewall rules allow pilot job connections back to your laptop). " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example 5: Running a simple app using pilot jobs\n", "\n", "In this example we show how one might develop a script to run on a remote resource. We first develop a Parsl script for sorting a file. We initially run this script using the local ThreadPoolExecutor on a laptop, we subsequently extend this example to submit the job via the HighThroughputExecutor pilot job model locally and then on a remote resource. We finally show how Parsl's SSH channel can be used to connect to the remote resource directly from your laptop. \n", "\n", "This script uses the `sort` application to sort the numbers from an unsorted file. \n", "\n", "### Example 5.a: Running the sort app locally using threads\n", "\n", "First we use the local thread executor to run the `sort` command locally." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import parsl\n", "from parsl.app.app import python_app, bash_app\n", "from parsl.configs.local_threads import config\n", "\n", "parsl.load(config)\n", "\n", "@bash_app\n", "def sort(unsorted, \n", " outputs=[],\n", " stderr='output/p5_a_sort.err',\n", " stdout='output/p5_a_sort.out'):\n", " \"\"\"Call sort executable on the input file\"\"\"\n", " return \"sort -g {} > {}\".format(unsorted, outputs[0])\n", "\n", "# call the sort app on the unsorted.txt file\n", "# save the results to a_sorted.txt\n", "s = sort(\"input/unsorted.txt\", outputs=[\"output/a_sorted.txt\"])\n", "\n", "# wait for the result\n", "output_file = s.outputs[0].result()\n", "\n", "# print the contents of the unsorted and sorted files\n", "print(\"Contents of the unsorted.txt file:\")\n", "with open('input/unsorted.txt', 'r') as f:\n", " print(f.read().replace(\"\\n\",\",\"))\n", " \n", "print(\"\\nContents of the sorted output file:\")\n", "with open(output_file, 'r') as f:\n", " print(f.read().replace(\"\\n\",\",\"))\n", " \n", "parsl.clear()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Example 5.b: Running the sort app locally using pilot jobs\n", "\n", "We now use IPyParallel to run the `sort` command using a pilot job model. In this case we use the `htex_local` configuration to tell Parsl to use local HighThroughputExecutor. \n", "\n", "You will notice that apart from the configuration and instantiation of the DataFlowKernal this script is identical to the previous script. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import parsl\n", "from parsl.app.app import python_app, bash_app\n", "from parsl.configs.htex_local import config\n", "\n", "parsl.load(config)\n", "\n", "@bash_app\n", "def sort(unsorted: str, \n", " outputs: list = [],\n", " stderr: str='output/p5_b_sort.err',\n", " stdout: str='output/p5_b_sort.out'):\n", " \"\"\"Call sort executable on the input file\"\"\"\n", " return \"sort -g {} > {}\".format(unsorted, outputs[0])\n", "\n", "s = sort(\"input/unsorted.txt\", outputs=[\"output/b_sorted.txt\"])\n", "\n", "output_file = s.outputs[0].result()\n", "\n", "print(\"Contents of the unsorted.txt file:\")\n", "with open('input/unsorted.txt', 'r') as f:\n", " print(f.read().replace(\"\\n\",\",\"))\n", " \n", "print(\"\\nContents of the sorted output file:\")\n", "with open(output_file, 'r') as f:\n", " print(f.read().replace(\"\\n\",\",\"))\n", " \n", "parsl.clear()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Example 5.c: Running the sort app on a cluster using pilot jobs\n", "\n", "We now take the previous example and run it on a cluster using HighThroughputExecutor. To run this example you will need to execute the script from a login node and uncomment the configuration needed for your cluster. \n", "\n", "Note: you will need to either install the Parsl library on that login node or use an existing environment. You will also need to download the parsl-tutorial repository and ensure that it is avaialble on the worker nodes via a shared file system." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import parsl\n", "from parsl.app.app import python_app, bash_app\n", "import os\n", "\n", "from config.midway import config\n", "\n", "parsl.load(config)\n", "\n", "@bash_app\n", "def sort(unsorted, outputs=[]):\n", " \"\"\"Call sort executable on the input file\"\"\"\n", " return \"sort -g {} > {}\".format(unsorted, outputs[0])\n", "\n", "s = sort(os.path.abspath(\"input/unsorted.txt\"), \n", " outputs=[os.path.abspath(\"output/sorted_c.txt\")])\n", "\n", "output_file = s.outputs[0].result()\n", "\n", "print(\"Contents of the unsorted.txt file:\")\n", "with open('input/unsorted.txt', 'r') as f:\n", " print(f.read().replace(\"\\n\",\",\"))\n", " \n", "print(\"\\nContents of the sorted output file:\")\n", "with open(output_file, 'r') as f:\n", " print(f.read().replace(\"\\n\",\",\"))\n", "\n", "parsl.clear()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Example 5.d: Running the sort app on remote resources using pilot jobs\n", "\n", "In the previous example we needed to execute the script from a login node. Parsl also can be run over a remote SSH channel. However, it requires that the machine on which the script is executed to be able to host the HighThroughputExecutor interchange and therefore be accessible from the remote machine (e.g., network access, firewall rules, etc.).\n", "\n", "You will also need to configure your SSH agent to enable creation of the SSH connection to the login node. In a terminal type: \n", "\n", " $ ssh-add\n", "\n", "In this eaxmple we use a Python app to first create the unsorted file and then we use the Parsl SSH connection to explicitly stage the output file back to the host machine.\n", "\n", "To run this script you must update the shared_dir variable as well as the configuration details below.\n", "\n", "#### Running on a shared notebook server\n", "If you are running this notebook in a shared environment (e.g., as part of a tutorial) you will need to configure SSH agent forwarding. First enable agent forwarding on your host machine:\n", "\n", " $ vi ~/.ssh/config file: \n", "\n", "```\n", "Host \n", " ForwardAgent yes\n", "```\n", "\n", "Then SSH to that host while forwarding your agent\n", " \n", " $ ssh-add\n", " \n", " $ ssh -A \n", " \n", "Finally, on the remote machine find the SSH_AUTH_SOCK path and paste that in this notebook\n", "\n", " $ echo $SSH_AUTH_SOCK" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import parsl\n", "from parsl.app.app import python_app, bash_app\n", "from parsl import set_stream_logger, NullHandler\n", "from parsl.channels import SSHChannel\n", "\n", "# set the stream logger to print debug messages\n", "#set_stream_logger()\n", "\n", "from config.midway import config\n", "\n", "import os\n", "\n", "# directory shared with worker nodes\n", "shared_dir = \"SHARED_DIR\" \n", "username='USERNAME',\n", "hostname='midway.rcc.uchicago.edu',\n", " \n", "ssh_config = SSHChannel(hostname=hostname,\n", " username=username,\n", " script_dir=shared_dir,\n", " )\n", "\n", "config.executors[0].provider.channel = ssh_config\n", "\n", "# if using a shared Jupyter environmnet with agent forwarding\n", "#os.environ['SSH_AUTH_SOCK'] = '/tmp/ssh-uBccLlzXVT/agent.6420'\n", "parsl.load(config)\n", "\n", "@python_app\n", "def create_unsorted_file(outputs=[]):\n", " \"\"\"Create an unsorted file by generating random numbers\"\"\"\n", " from random import randint\n", " file = open(outputs[0], 'w') \n", " for i in range(0,50):\n", " file.write(\"{0}\\n\".format(randint(1,100)))\n", " file.close() \n", "\n", "@bash_app\n", "def sort(unsorted, outputs=[]):\n", " \"\"\"Call sort executable on the input file\"\"\"\n", " return \"sort -g {0} > {1}\".format(unsorted, outputs[0])\n", "\n", "# create the unsorted file\n", "unsorted = create_unsorted_file(outputs=[os.path.join(shared_dir, \"unsorted-generated.txt\")])\n", "\n", "# sort the file into a new file called sorted_d.txt\n", "s = sort(unsorted.outputs[0], \n", " outputs=[os.path.join(shared_dir, \"sorted_d.txt\")])\n", "\n", "# wait for the app to complete\n", "output_file = s.outputs[0].result()\n", "\n", "# use Parsl's SSH channel to copy the sorted file\n", "dfk.executor.execution_provider.channel.pull_file(output_file, '.')\n", "with open(os.path.basename(output_file), 'r') as f:\n", " print(f.read().replace(\"\\n\",\",\"))\n", " \n", "parsl.clear() " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example 7: MPI Hello\n", "\n", "The final example is a basic \"Hello World!\" example that shows you how to run MPI applications. Here we have a simple MPI code `mpi_hello.c` that has each MPI rank sleep for a user-specified duration and then print the processor name on which the rank is executing followed by \"Hello World!\". \n", "\n", "The following script is designed to be run from the login node. Like the previous examples you will need to update the shared directory. You may optionally use an SSH channel by following the previous instructions. If running from the login node you will not need the remote configuration details. \n", "\n", "Given the range of MPI libraries installed on each site you will need to specify the site-specific MPI compiler (e.g., mpicc, cc) and the way to execute the MPI job (e.g., mpirun, srun). " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import parsl\n", "from parsl.app.app import python_app, bash_app\n", "from config.midway import config\n", "\n", "import os\n", "\n", "parsl.load(config)\n", "\n", "remote = False\n", "shared_dir = 'SHARED_DIRECTORY' # path to the tutorial repository\n", "\n", "@bash_app\n", "def compile_app(dirpath, stdout=None, stderr=None, compiler=\"mpicc\"):\n", " \"\"\"Compile mpi app using site-specific compiler.\n", " E.g., midway compiler = mpicc, Cori compiler= cc\n", " \"\"\"\n", " return '''cd {0}; make clean; make CC={1} '''.format(dirpath, compiler)\n", "\n", "@bash_app\n", "def mpi_hello(dirpath, launcher=\"mpirun\", app=\"mpi_hello\", nproc=20, outputs=[]):\n", " \"\"\"Call compiled mpi executable with local mpilib.\n", " Works natively for openmpi mpiexec, mpirun, orterun, oshrun, shmerun\n", " mpiexec is default\"\"\"\n", " import os\n", " if launcher == \"mpirun\" :\n", " return \"mpirun -np {} {} &> {};\".format(nproc, os.path.join(dirpath,app), outputs[0])\n", " elif launcher == \"srun\" :\n", " return \"srun -n {} ./{} &> {};\".format(nproc, os.path.join(dirpath,app), outputs[0])\n", "\n", "# complile the app and wait for it to complete (.result())\n", "compile_app(dirpath=os.path.join(shared_dir, \"mpi_apps\"),\n", " stdout=os.path.join(shared_dir, \"mpi_apps.compile.out\"),\n", " stderr=os.path.join(shared_dir, \"mpi_apps.compile.err\",),\n", " compiler='mpicc'\n", " ).result()\n", "\n", "# run the mpi app\n", "hello = mpi_hello(os.path.join(shared_dir, \"mpi_apps\"),\n", " launcher=\"mpirun\",\n", " outputs=[os.path.join(shared_dir, \"mpi_apps\", \"hello.txt\")])\n", "\n", "output_file = hello.outputs[0].result()\n", "\n", "# if running remotely using SSH, copy the file back to the host\n", "if remote:\n", " dfk.executor.execution_provider.channel.pull_file(output_file, '.')\n", "\n", "# read the result file\n", "with open(output_file, 'r') as f:\n", " print(f.read())\n", "\n", "parsl.clear()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 2 }