{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Parsl: Advanced Features\n", "\n", "In this tutorial we present advanced features of Parsl including its ability to support multiple sites, elastically scale across sites, and its support for fault tolerance.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import parsl\n", "from parsl import *\n", "#parsl.set_stream_logger() # <-- log everything to stdout\n", "print(parsl.__version__)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 1) Multiple Sites\n", "\n", "In the \"parsl-introduction\" notebook we showed how a configuration file controls the execution provider and model used to execute a Parsl script. While we showed only a single site, Parsl is capable of distributing workload over several sites simultaneously. Below we show an example configuration that combines local thread execution and local pilot job execution. By default, Apps will execute on any configured sites. However, you can also specify a specific site, or sites, on which an App can execute by adding a list of sites to the App decorator. In the following cells, we show a three-stage workflow in which the first app uses local threads, the second uses local pilot jobs, and the third (with no sites specified) will use either threads or pilot jobs." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, we define two \"sites\", which in this example are both local. The first uses threads, and the second uses pilot job execution. We then instantiate a DataFlowKernel object with these two sites." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Define a configuration for using local threads and pilot jobs\n", "multi_site_config = {\n", " \"sites\" : [\n", " { \"site\" : \"Local_Threads\",\n", " \"auth\" : { \"channel\" : None },\n", " \"execution\" : {\n", " \"executor\" : \"threads\",\n", " \"provider\" : None,\n", " \"maxThreads\" : 4\n", " }\n", " }, {\n", " \"site\" : \"Local_IPP\",\n", " \"auth\" : {\n", " \"channel\" : \"local\"\n", " },\n", " \"execution\" : {\n", " \"executor\" : \"ipp\",\n", " \"provider\" : \"local\",\n", " \"script_dir\" : \".scripts\",\n", " \"scriptDir\" : \".scripts\",\n", " \"block\" : {\n", " \"nodes\" : 1,\n", " \"taskBlocks\" : 1,\n", " \"walltime\" : \"00:05:00\",\n", " \"initBlocks\" : 1,\n", " \"minBlocks\" : 0,\n", " \"maxBlocks\" : 1,\n", " \"scriptDir\" : \".\"\n", " }\n", " }\n", " }],\n", " \"globals\" : {\"lazyErrors\" : True}\n", "}\n", "\n", "dfk = DataFlowKernel(config=multi_site_config)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we define three Apps, which have the same functionality as in the previous tutorial. However, the first is specified to use the first site only, the second is specific to use the second site only, and the third doesn't have a site specification, so it can run on any available site." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Generate app runs on the \"Local_Threads\" site\n", "@App('bash', dfk, sites=[\"Local_Threads\"])\n", "def generate(outputs=[]):\n", " return \"echo $(( RANDOM )) &> {outputs[0]}\"\n", "\n", "# Concat app runs on the \"Local_IPP\" site\n", "@App('bash', dfk, sites=[\"Local_IPP\"])\n", "def concat(inputs=[], outputs=[], stdout=\"stdout.txt\", stderr='stderr.txt'):\n", " return \"cat {0} > {1}\".format(\" \".join(inputs), outputs[0])\n", "\n", "# Total app runs on either site\n", "@App('python', dfk)\n", "def total(inputs=[]):\n", " total = 0\n", " with open(inputs[0], 'r') as f:\n", " for l in f:\n", " total += int(l)\n", " return total" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, we run the apps, and cleanup." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create 5 files with random numbers\n", "output_files = []\n", "for i in range (5):\n", " output_files.append(generate(outputs=['random-%s.txt' % i]))\n", "\n", "# Concatenate the files into a single file\n", "cc = concat(inputs=[i.outputs[0] for i in output_files], outputs=[\"all.txt\"])\n", "\n", "# Calculate the sum of the random numbers\n", "total = total(inputs=[cc.outputs[0]])\n", "\n", "print (total.result())\n", "\n", "dfk.cleanup()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 2) Elasticity\n", "\n", "As a Parsl script is evaluated, it creates a collection of tasks for asynchronous execution. In most cases this stream of tasks is variable as different stages of the workflow are evaluated. To address this variability, Parsl is able to monitor the flow of tasks and elastically provision resources, within user specified bounds, in response. \n", "\n", "In the following example, we declare the range of blocks to be provisioned from 0 to 10 (minBlocks and maxBlocks, respectively). We then set parallelism to 0.1, which means that Parsl will favor reusing resources rather than provisioning new resources. You should see that the app is executed on the same process ID. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "local_ipp = {\n", " \"sites\": [\n", " {\"site\": \"Local_IPP\",\n", " \"auth\": {\n", " \"channel\": None,\n", " },\n", " \"execution\": {\n", " \"executor\": \"ipp\",\n", " \"provider\": \"local\",\n", " \"block\": {\n", " \"nodes\": 1,\n", " \"taskBlocks\": 1,\n", " \"minBlocks\": 0,\n", " \"initBlocks\": 0,\n", " \"maxBlocks\": 10,\n", " \"parallelism\": 0.1,\n", " \"walltime\": \"00:20:00\"\n", " }\n", " }\n", " }]\n", "}\n", "\n", "dfk = DataFlowKernel(config=local_ipp.copy())\n", "\n", "@App(\"python\", dfk)\n", "def python_app():\n", " import time \n", " import os\n", " time.sleep(5)\n", " return \"(%s) Hello World!\" % os.getpid()\n", "\n", "results = {}\n", "for i in range(0, 10):\n", " results[i] = python_app()\n", "\n", "print(\"Waiting for results ....\")\n", "for i in range(0, 10):\n", " print(results[i].result())\n", "\n", "dfk.cleanup()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We now modify the parallelism option to 1. This configuration means that Parsl will favor elastic growth to execute as many tasks simultaineously as possible, up to the user defined limit. You can modify the parallelism between 0 and 1 to experiment with different scaling policies. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "local_ipp = {\n", " \"sites\": [\n", " {\"site\": \"Local_IPP\",\n", " \"auth\": {\n", " \"channel\": None,\n", " },\n", " \"execution\": {\n", " \"executor\": \"ipp\",\n", " \"provider\": \"local\",\n", " \"block\": {\n", " \"nodes\": 1,\n", " \"taskBlocks\": 1,\n", " \"minBlocks\": 0,\n", " \"initBlocks\": 0,\n", " \"maxBlocks\": 10,\n", " \"parallelism\": 1,\n", " \"walltime\": \"00:20:00\"\n", " }\n", " }\n", " }]\n", "}\n", "\n", "dfk = DataFlowKernel(config=local_ipp.copy())\n", "\n", "@App(\"python\", dfk)\n", "def python_app():\n", " import time \n", " import os\n", " time.sleep(5)\n", " return \"(%s) Hello World!\" % os.getpid()\n", "\n", "results = {}\n", "for i in range(0, 10):\n", " results[i] = python_app()\n", "\n", "print(\"Waiting for results ....\")\n", "for i in range(0, 10):\n", " print(results[i].result())\n", "\n", "dfk.cleanup()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 3) Fault tolerance and caching\n", "\n", "Workflows are often re-executed for various reasons, including workflow or node failure, code errors, or extension of the workflow. It is inefficient to re-execute apps that have succesfully completed. Parsl provides two mechanisms to improve efficacy via app caching and/or workflow-level checkpointing. \n", "\n", "### App Caching\n", "\n", "When developing a workflow, developers often re-execute the same workflow with incremental changes. Often large fragments of the workflow are re-executed even though they have not been modified. This wastes not only time but also computational resources. App Caching solves this problem by caching results from apps that have completed so that they can be re-used. Caching is enabled by setting the `cache` argument to the App wrapper. Note: the cached result is returned only when the same function, with the same name, input arguments, and function body is called. If any of these are changed, a new result is computed and returned.\n", "\n", "The following example shows two calls to the `slow_message` app with the same message. You will see that the first call is slow (since the app sleeps for 5 seconds), but the second call returns immedidately (the app is not actually executed this time, so there is no sleep delay). \n", "\n", "Note: running this example in Jupyter notebooks will cache the results through subsequent executions of the cell. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "local_threads = {\n", " \"sites\" : [\n", " { \"site\" : \"Local_Threads\",\n", " \"auth\" : { \"channel\" : None },\n", " \"execution\" : {\n", " \"executor\" : \"threads\",\n", " \"provider\" : None,\n", " \"maxThreads\" : 4\n", " }\n", " }\n", " ]\n", "}\n", "\n", "dfk = DataFlowKernel(config=local_threads)\n", "@App('python', dfk, cache = True)\n", "def slow_message(message):\n", " import time \n", " time.sleep(5)\n", " return message\n", "\n", "# First call to slow_message will calcuate the value\n", "first = slow_message(\"Hello World\")\n", "print (\"First: %s\" % first.result())\n", "\n", "# Second call to slow_message with the same args will\n", "# return immediately\n", "second = slow_message(\"Hello World\")\n", "print (\"Second: %s\" % second.result())\n", "\n", "# Third call to slow_message with different arguments\n", "# will again wait\n", "third = slow_message(\"Hello World!\")\n", "print (\"Third: %s\" % third.result())\n", "\n", "dfk.cleanup()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Checkpointing\n", "\n", "Parsl's checkpointing model enables workflow state to be saved and then used at a later time to resume execution from that point. Checkpointing provides workflow-level fault tolerance, insuring against failure of the Parsl control process. \n", "\n", "Parsl implements an incremental checkpointing model: each explicit checkpoint will save state changes from the previous checkpoint. Thus, the full history of a workflow may be distributed across multiple checkpoints.\n", "\n", "Checkpointing uses App caching to store results. Thus, the same caveats apply to non-deterministic functions. That is, the checkpoint saves results for an instance of an App when it has the same name, arguments, and function body. \n", "\n", "In this example we demonstrate how to automatically checkpoint workflows when tasks succesfully execute. This is enabled in the config by setting `checkpointMode` to `task_exit`. Other checkpointing models are described in the [checkpointing documentation](https://parsl.readthedocs.io/en/latest/userguide/checkpoints.html).\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "local_threads = {\n", " \"sites\" : [\n", " { \"site\" : \"Local_Threads\",\n", " \"auth\" : { \"channel\" : None },\n", " \"execution\" : {\n", " \"executor\" : \"threads\",\n", " \"provider\" : None,\n", " \"maxThreads\" : 4\n", " }\n", " }\n", " ], \n", " \"globals\": {\"lazyErrors\": True,\n", " \"memoize\": True,\n", " \"checkpointMode\": \"task_exit\",\n", " }\n", "}\n", "\n", "dfk = DataFlowKernel(config=local_threads)\n", "\n", "@App('python', dfk, cache=True)\n", "def slow_double(x):\n", " import time\n", " time.sleep(5)\n", " return x * 2\n", "\n", "d = []\n", "for i in range(5):\n", " d.append(slow_double(i))\n", "\n", "# wait for results\n", "print([d[i].result() for i in range(5)])\n", "\n", "dfk.cleanup()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To restart from a previous checkpoint the DFK must be configured with the appropriate checkpoint file. In most cases this is likley to be the most recent checkpoint file created. The following approach works with any checkpoint file, irrespective of which checkpointing method was used to create it. \n", "\n", "In this example we reload the most recent checkpoint and attempt to run the same workflow. The results return immediately as there is no need to rexecute each app. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "last_runid = sorted(os.listdir('runinfo/'))[-1]\n", "last_checkpoint = os.path.abspath('runinfo/{0}/checkpoint'.format(last_runid))\n", "\n", "print(\"Restarting from checkpoint: %s\" % last_checkpoint) \n", "dfk = DataFlowKernel(config=local_threads,\n", " checkpointFiles=[last_checkpoint])\n", "\n", "# Rerun the same workflow\n", "d = []\n", "for i in range(5):\n", " d.append(slow_double(i))\n", "\n", "# wait for results\n", "print([d[i].result() for i in range(5)])\n", "\n", "dfk.cleanup()" ] } ], "metadata": { "kernelspec": { "display_name": "Python [default]", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" } }, "nbformat": 4, "nbformat_minor": 2 }