{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Parsl tutorial\n", "\n", "Parsl is a native Python library that allows you to write functions that execute in parallel and tie them together with dependencies to create workflows. Parsl wraps Python functions as \"Apps\" using the **@python_app** decorator, and Apps that call external applications using the **@bash_app** decorator. Decorated functions can run in parallel when all their inputs are ready.\n", "\n", "For more comprehensive documentation and examples, please refer our [documentation](http://parsl.readthedocs.io/en/latest/)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import parsl\n", "import os\n", "from parsl.app.app import python_app, bash_app\n", "from parsl.configs.local_threads import config\n", "\n", "#parsl.set_stream_logger() # <-- log everything to stdout\n", "\n", "print(parsl.__version__)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Configuring Parsl\n", "\n", "Parsl separates code and execution. To do so, it relies on a configuration model to describe the pool of resources to be used for execution (e.g., clusters, clouds, threads). \n", "\n", "\n", "We'll come back to configuration later in this tutorial. For now, we configure this example to use a local pool of [threads](https://en.wikipedia.org/wiki/Thread_computing) to facilitate local parallel execution. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "parsl.load(config)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Apps\n", "\n", "\n", "In Parsl an `app` is a piece of code that can be asynchronously executed on an execution resource (e.g., cloud, cluster, or local PC). Parsl provides support for pure Python apps (`python_app`) and also command-line apps executed via Bash (`bash_app`).\n", "\n", "### Python Apps\n", "\n", "As a first example, let's define a simple Python function that returns the string 'Hello World!'. This function is made into a Parsl App using the **@python_app** decorator. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "@python_app\n", "def hello ():\n", " return 'Hello World!'\n", "\n", "print(hello().result())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As can be seen above, Apps wrap standard Python function calls. As such, they can be passed arbitrary arguments and return standard Python objects. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@python_app\n", "def multiply(a, b):\n", " return a * b\n", "\n", "print(multiply(5, 9).result())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As Parsl apps are potentially executed remotely, they must contain all required dependencies in the function body. For example, if an app requires the time library, it should import that library within the function. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@python_app\n", "def slow_hello ():\n", " import time\n", " time.sleep(5)\n", " return 'Hello World!'\n", "\n", "print(slow_hello().result())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Bash Apps\n", "\n", "Parsl’s Bash app allows you to wrap execution of external applications from the command-line as you would in a Bash shell. It can also be used to execute Bash scripts directly. To define a Bash app, the wrapped Python function must return the command-line string to be executed.\n", "\n", "As a first example of a Bash app, let's use the Linux command `echo` to return the string 'Hello World!'. This function is made into a Bash App using the **@bash_app** decorator. \n", "\n", "Note that the `echo` command will print 'Hello World!' to stdout. In order to use this output, we need to tell Parsl to capture stdout. This is done by specifying the `stdout` keyword argument in the app function. The same approach can be used to capture `stderr`.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@bash_app\n", "def echo_hello(stdout='echo-hello.stdout', stderr='echo-hello.stderr'):\n", " return 'echo \"Hello World!\"'\n", "\n", "echo_hello().result()\n", "\n", "with open('echo-hello.stdout', 'r') as f:\n", " print(f.read())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Passing data\n", "\n", "Parsl Apps can exchange data as Python objects (as shown above) or in the form of files. In order to enforce dataflow semantics, Parsl must track the data that is passed into and out of an App. To make Parsl aware of these dependencies, the app function includes `inputs` and `outputs` keyword arguments.\n", "\n", "We first create three test files named hello1.txt, hello2.txt, and hello3.txt containing the text \"hello 1\", \"hello 2\", and \"hello 3\"." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for i in range(3):\n", " with open(os.path.join(os.getcwd(), 'hello-{}.txt'.format(i)), 'w') as f:\n", " f.write('hello {}\\n'.format(i)) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We then write an App that will concentate these files using `cat`. We pass in the list of hello files (`inputs`) and concatenate the text into a new file named all_hellos.txt (`outputs`). As we describe below we use Parsl File objects to abstract file locations in the event the `cat` app is executed on a different computer." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from parsl.data_provider.files import File\n", "\n", "@bash_app\n", "def cat(inputs=[], outputs=[]):\n", " return 'cat {} > {}'.format(\" \".join([i.filepath for i in inputs]), outputs[0]) \n", "\n", "concat = cat(inputs=[File(os.path.join(os.getcwd(), 'hello-0.txt')),\n", " File(os.path.join(os.getcwd(), 'hello-1.txt')),\n", " File(os.path.join(os.getcwd(), 'hello-2.txt'))], \n", " outputs=[File(os.path.join(os.getcwd(), 'all_hellos.txt'))])\n", "\n", "# Open the concatenated file\n", "with open(concat.outputs[0].result(), 'r') as f:\n", " print(f.read())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Futures\n", "When a normal Python function is invoked, the Python interpreter waits for the function to complete execution and returns the results. In case of long running functions, it may not be desirable to wait for completion. Instead, it is preferable that functions are executed asynchronously. Parsl provides such asynchronous behavior by returning a future in lieu of results. A future is essentially an object that allows Parsl to track the status of an asynchronous task so that it may, in the future, be interrogated to find the status, results, exceptions, etc.\n", "\n", "Parsl provides two types of futures: AppFutures and DataFutures. While related, these two types of futures enable subtly different workflow patterns, as we will see." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### AppFutures\n", "AppFutures are the basic building block upon which Parsl scripts are built. Every invocation of a Parsl app returns an AppFuture, which may be used to manage execution of the app and control the workflow.\n", "\n", "Here we show how AppFutures are used to wait for the result of a Python App." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@python_app\n", "def hello ():\n", " import time\n", " time.sleep(5)\n", " return 'Hello World!'\n", "\n", "app_future = hello()\n", "\n", "# Check if the app_future is resolved, which it won't be\n", "print('Done: {}'.format(app_future.done()))\n", "\n", "# Print the result of the app_future. Note: this\n", "# call will block and wait for the future to resolve\n", "print('Result: {}'.format(app_future.result()))\n", "print('Done: {}'.format(app_future.done()))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### DataFutures\n", "\n", "While AppFutures represent the execution of an asynchronous app, DataFutures represent the files it produces. Parsl’s dataflow model, in which data flows from one app to another via files, requires such a construct to enable apps to validate creation of required files and to subsequently resolve dependencies when input files are created. When invoking an app, Parsl requires that a list of output files be specified (using the `outputs` keyword argument). A DataFuture for each file is returned by the app when it is executed. Throughout execution of the app, Parsl will monitor these files to 1) ensure they are created, and 2) pass them to any dependent apps." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# App that echos an input message to an output file\n", "@bash_app\n", "def slowecho(message, outputs=[]):\n", " return 'sleep 5; echo %s &> %s' % (message, outputs[0])\n", "\n", "# Call slowecho specifying the output file\n", "hello = slowecho('Hello World!', outputs=[File(os.path.join(os.getcwd(), 'hello-world.txt'))])\n", "\n", "# The AppFuture's outputs attribute is a list of DataFutures\n", "print(hello.outputs)\n", "\n", "# Also check the AppFuture\n", "print('Done: {}'.format(hello.done()))\n", "\n", "# Print the contents of the output DataFuture when complete\n", "with open(hello.outputs[0].result(), 'r') as f:\n", " print(f.read())\n", " \n", "# Now that this is complete, check the DataFutures again, and the Appfuture\n", "print(hello.outputs)\n", "print('Done: {}'.format(hello.done()))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Data Management\n", "\n", "Parsl is designed to enable implementation of dataflow patterns. These patterns enable workflows, in which the data passed between apps manages the flow of execution, to be defined. Dataflow programming models are popular as they can cleanly express, via implicit parallelism, the concurrency needed by many applications in a simple and intuitive way.\n", "\n", "### Files\n", "\n", "Parsl’s file abstraction abstracts access to a file irrespective of where the app is executed. When referencing a Parsl file in an app (by calling `filepath`), Parsl translates the path to the file's location relative to the file system on which the app is executing." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from parsl.data_provider.files import File\n", "\n", "# App that copies the contents of a file to another file\n", "@bash_app\n", "def copy(inputs=[], outputs=[]):\n", " return 'cat %s &> %s' % (inputs[0], outputs[0])\n", "\n", "# Create a test file\n", "open(os.path.join(os.getcwd(), 'cat-in.txt'), 'w').write('Hello World!\\n')\n", "\n", "# Create Parsl file objects\n", "parsl_infile = File(os.path.join(os.getcwd(), 'cat-in.txt'),)\n", "parsl_outfile = File(os.path.join(os.getcwd(), 'cat-out.txt'),)\n", "\n", "# Call the copy app with the Parsl file\n", "copy_future = copy(inputs=[parsl_infile], outputs=[parsl_outfile])\n", "\n", "# Read what was redirected to the output file\n", "with open(copy_future.outputs[0].result(), 'r') as f:\n", " print(f.read())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Remote Files\n", "\n", "The Parsl file abstraction can also represent remotely accessible files. In this case, you can instantiate a file object using the remote location of the file. Parsl will implictly stage the file to the execution environment before executing any dependent apps. Parsl will also translate the location of the file into a local file path so that any dependent apps can access the file in the same way as a local file. Parsl supports files that are accessible via Globus, FTP, and HTTP. \n", "\n", "Here we create a File object using a publicly accessible file with random numbers. We can pass this file to the `sort_numbers` app in the same way we would a local file. " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "from parsl.data_provider.files import File\n", "\n", "@python_app\n", "def sort_numbers(inputs=[]):\n", " with open(inputs[0].filepath, 'r') as f:\n", " strs = [n.strip() for n in f.readlines()]\n", " strs.sort()\n", " return strs\n", "\n", "unsorted_file = File('https://raw.githubusercontent.com/Parsl/parsl-tutorial/master/input/unsorted.txt')\n", "\n", "f = sort_numbers(inputs=[unsorted_file])\n", "print (f.result())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Composing a workflow\n", "\n", "Now that we understand all the building blocks, we can create workflows with Parsl. Unlike other workflow systems, Parsl creates implicit workflows based on the passing of control or data between Apps. The flexibility of this model allows for the creation of a wide range of workflows from sequential through to complex nested, parallel workflows. As we will see below, a range of workflows can be created by passing AppFutures and DataFutures between Apps.\n", "\n", "\n", "### Sequential workflow\n", "\n", "Simple sequential or procedural workflows can be created by passing an AppFuture from one task to another. The following example shows one such workflow, which first generates a random number and then writes it to a file. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# App that generates a random number\n", "@python_app\n", "def generate(limit):\n", " from random import randint\n", " return randint(1,limit)\n", "\n", "# App that writes a variable to a file\n", "@bash_app\n", "def save(variable, outputs=[]):\n", " return 'echo %s &> %s' % (variable, outputs[0])\n", "\n", "# Generate a random number between 1 and 10\n", "random = generate(10)\n", "print('Random number: %s' % random.result())\n", "\n", "# Save the random number to a file\n", "saved = save(random, outputs=[File(os.path.join(os.getcwd(), 'sequential-output.txt'))])\n", "\n", "# Print the output file\n", "with open(saved.outputs[0].result(), 'r') as f:\n", " print('File contents: %s' % f.read())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Parallel workflow\n", "\n", "The most common way that Parsl Apps are executed in parallel is via looping. The following example shows how a simple loop can be used to create many random numbers in parallel. Note that this takes 5 seconds to run (the time needed for the longest delay), not the 15 seconds that would be needed if these generate functions were called and returned in sequence.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# App that generates a random number after a delay\n", "@python_app\n", "def generate(limit,delay):\n", " from random import randint\n", " import time\n", " time.sleep(delay)\n", " return randint(1,limit)\n", "\n", "# Generate 5 random numbers between 1 and 10\n", "rand_nums = []\n", "for i in range(5):\n", " rand_nums.append(generate(10,i))\n", "\n", "# Wait for all apps to finish and collect the results\n", "outputs = [i.result() for i in rand_nums]\n", "\n", "# Print results\n", "print(outputs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Parallel dataflow\n", "\n", "Parallel dataflows can be developed by passing data between Apps. In this example we create a set of files, each with a random number, we then concatenate these files into a single file and compute the sum of all numbers in that file. The calls to the first App each create a file, and the second App reads these files and creates a new one. The final App returns the sum as a Python integer.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# App that generates a semi-random number between 0 and 32,767\n", "@bash_app\n", "def generate(outputs=[]):\n", " return \"echo $(( RANDOM )) &> {}\".format(outputs[0])\n", "\n", "# App that concatenates input files into a single output file\n", "@bash_app\n", "def concat(inputs=[], outputs=[]):\n", " return \"cat {0} > {1}\".format(\" \".join([i.filepath for i in inputs]), outputs[0])\n", "\n", "# App that calculates the sum of values in a list of input files\n", "@python_app\n", "def total(inputs=[]):\n", " total = 0\n", " with open(inputs[0], 'r') as f:\n", " for l in f:\n", " total += int(l)\n", " return total\n", "\n", "# Create 5 files with semi-random numbers in parallel\n", "output_files = []\n", "for i in range (5):\n", " output_files.append(generate(outputs=[File(os.path.join(os.getcwd(), 'random-{}.txt'.format(i)))]))\n", "\n", "# Concatenate the files into a single file\n", "cc = concat(inputs=[i.outputs[0] for i in output_files], \n", " outputs=[File(os.path.join(os.getcwd(), 'all.txt'))])\n", "\n", "# Calculate the sum of the random numbers\n", "total = total(inputs=[cc.outputs[0]])\n", "print (total.result())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Examples\n", "\n", "### Monte Carlo workflow\n", "\n", "Many scientific applications use the [Monte Carlo method](https://en.wikipedia.org/wiki/Monte_Carlo_method#History) to compute results. \n", "\n", "One example is calculating $\\pi$ by randomly placing points in a box and using the ratio that are placed inside the circle.\n", "\n", "Specifically, if a circle with radius $r$ is inscribed inside a square with side length $2r$, the area of the circle is $\\pi r^2$ and the area of the square is $(2r)^2$.\n", "\n", "Thus, if $N$ uniformly-distributed random points are dropped within the square, approximately $N\\pi/4$ will be inside the circle.\n", "\n", "Each call to the function `pi()` is executed independently and in parallel. The `avg_three()` app is used to compute the average of the futures that were returned from the `pi()` calls.\n", "\n", "The dependency chain looks like this:\n", "\n", "```\n", "App Calls pi() pi() pi()\n", " \\ | /\n", "Futures a b c\n", " \\ | /\n", "App Call avg_points()\n", " |\n", "Future avg_pi\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# App that estimates pi by placing points in a box\n", "@python_app\n", "def pi(num_points):\n", " from random import random\n", " \n", " inside = 0 \n", " for i in range(num_points):\n", " x, y = random(), random() # Drop a random point in the box.\n", " if x**2 + y**2 < 1: # Count points within the circle.\n", " inside += 1\n", " \n", " return (inside*4 / num_points)\n", "\n", "# App that computes the mean of three values\n", "@python_app\n", "def mean(a, b, c):\n", " return (a + b + c) / 3\n", "\n", "# Estimate three values for pi\n", "a, b, c = pi(10**6), pi(10**6), pi(10**6)\n", "\n", "# Compute the mean of the three estimates\n", "mean_pi = mean(a, b, c)\n", "\n", "# Print the results\n", "print(\"a: {:.5f} b: {:.5f} c: {:.5f}\".format(a.result(), b.result(), c.result()))\n", "print(\"Average: {:.5f}\".format(mean_pi.result()))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Execution and configuration\n", "\n", "Parsl is designed to support arbitrary execution providers (e.g., PCs, clusters, supercomputers, clouds) and execution models (e.g., threads, pilot jobs). The configuration used to run the script tells Parsl how to execute apps on the desired environment. Parsl provides a high level abstraction, called a Block, for describing the resource configuration for a particular app or script.\n", "\n", "Information about the different execution providers and executors supported is included in the [Parsl documentation](https://parsl.readthedocs.io/en/latest/userguide/execution.html).\n", "\n", "So far in this tutorial, we've used a built-in configuration for running with threads. Below, we will illustrate how to create configs for different environments.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Local execution with threads\n", "\n", "As we saw above, we can configure Parsl to execute apps on a local thread pool. This is a good way to parallelize execution on a local PC. The configuration object defines the executors that will be used as well as other options such as authentication method (e.g., if using SSH), checkpoint files, and executor specific configuration. In the case of threads we define the maximum number of threads to be used. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from parsl.config import Config\n", "from parsl.executors.threads import ThreadPoolExecutor\n", "\n", "local_threads = Config(\n", " executors=[\n", " ThreadPoolExecutor(\n", " max_threads=8, \n", " label='local_threads'\n", " )\n", " ]\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Local execution with pilot jobs\n", "\n", "We can also define a configuration that uses Parsl's HighThroughputExecutor. In this mode, pilot jobs are used to manage the submission. Parsl creates an interchange to manage execution and deploys one or more workers to execute tasks. The following config will instantiate this infrastructure locally, it can be extended to include a remote provider (e.g., the Cori or Theta supercomputers) for execution. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from parsl.providers import LocalProvider\n", "from parsl.channels import LocalChannel\n", "from parsl.config import Config\n", "from parsl.executors import HighThroughputExecutor\n", "\n", "local_htex = Config(\n", " executors=[\n", " HighThroughputExecutor(\n", " label=\"htex_Local\",\n", " worker_debug=True,\n", " cores_per_worker=1,\n", " provider=LocalProvider(\n", " channel=LocalChannel(),\n", " init_blocks=1,\n", " max_blocks=1,\n", " ),\n", " )\n", " ],\n", " strategy=None,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "parsl.clear()\n", "#parsl.load(local_threads)\n", "parsl.load(local_htex)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "@bash_app\n", "def generate(outputs=[]):\n", " return \"echo $(( RANDOM )) &> {}\".format(outputs[0])\n", "\n", "@bash_app\n", "def concat(inputs=[], outputs=[]):\n", " return \"cat {0} > {1}\".format(\" \".join(i.filepath for i in inputs), outputs[0])\n", "\n", "@python_app\n", "def total(inputs=[]):\n", " total = 0\n", " with open(inputs[0], 'r') as f:\n", " for l in f:\n", " total += int(l)\n", " return total\n", "\n", "# Create 5 files with semi-random numbers\n", "output_files = []\n", "for i in range (5):\n", " output_files.append(generate(outputs=[File(os.path.join(os.getcwd(), 'random-%s.txt' % i))]))\n", "\n", "# Concatenate the files into a single file\n", "cc = concat(inputs=[i.outputs[0] for i in output_files], \n", " outputs=[File(os.path.join(os.getcwd(), 'combined.txt'))])\n", "\n", "# Calculate the sum of the random numbers\n", "total = total(inputs=[cc.outputs[0]])\n", "\n", "print (total.result())" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.2" } }, "nbformat": 4, "nbformat_minor": 4 }