## Parsl Tutorial

Parsl is a native Python library that allows you to write functions that execute in parallel and tie them together with dependencies to create workflows. Parsl wraps Python functions as "Apps" using the **@App** decorator. Decorated functions can run in parallel when all their inputs are ready.

For more comprehensive documentation and examples, please refer our [documentation](http://parsl.readthedocs.io/en/latest/)

In [None]:
import parsl
from parsl import *
# parsl.set_stream_logger() # <-- log everything to stdout
print(parsl.__version__)

### DataFlowKernal

Parsl's `DataFlowKernel` acts as an abstraction layer over any pool of execution resources (e.g., clusters, clouds, threads). 


We'll come back to the DataFlowKernel later in this tutorial. For now, we configure this example to use a pool of [threads](https://en.wikipedia.org/wiki/Thread_(computing) to facilitate local parallel execution.

In [None]:
local_config = {
 "sites" : [
 { "site" : "Threads",
 "auth" : { "channel" : None },
 "execution" : {
 "executor" : "threads",
 "provider" : None,
 "maxThreads" : 4
 }
 }],
 "globals" : {"lazyErrors" : True}
}

dfk = DataFlowKernel(config=local_config)

# 1. Apps


In Parsl an `app` is a piece of code that can be asynchronously executed on an execution resource (e.g., cloud, cluster, or local PC). Parsl provides support for pure Python apps and also command-line apps executed via Bash.

## Python Apps

As a first example let's define a simple Python function that returns the string 'Hello World!'. This function is made into a Parsl App using the **@App** decorator. The decorator specifies the type of App ('python'|'bash') and the `DataFlowKernel` object as arguments.

In [None]:
@App('python', dfk)
def hello ():
 return 'Hello World!'

print(hello().result())

As can be seen above, Apps wrap standard Python function calls. As such, they can be passed arbitrary arguments and return standard Python objects. 

In [None]:
@App('python', dfk)
def multiply (a, b):
 return a * b

print(multiply(5,9).result())

## Bash Apps

Parsl’s Bash app allows you to wrap execution of external applications from the command-line as you would in a Bash shell. It can also be used to execute Bash scripts directly. To define a Bash app the wrapped Python function must return the command-line string to be executed.

Parsl is able to capture stdout/stderr for debugging or as a first class data object in a workflow.


In [None]:
@App('bash', dfk)
def echo_hello(stdout='echo-hello.stdout', stderr='echo-hello.stderr'):
 return 'echo "Hello World!"'

echo_hello().result()

with open('echo-hello.stdout', 'r') as f:
 print(f.read())

Often, Parsl Apps exchange data in the form of files. In order to orchestrate a dataflow it is important that Parsl is able to track the data that is passed into and out of an App. For this purpose Parsl Apps can define input and output files as follows.

We first create three test files named hello1.txt, hello2.txt, and hello3.txt containing the text "hello 1", "hello 2", and "hello 3".

In [None]:
!echo "hello 1" > /tmp/hello1.txt
!echo "hello 2" > /tmp/hello2.txt
!echo "hello 3" > /tmp/hello3.txt

We then write an App that will concentate these files using `cat`. We pass in the list of hello files and concatenate the text into an output file named all_hellos.txt.

In [None]:
@App('bash', dfk)
def cat(inputs=[], outputs=[]):
 return 'cat %s > %s' %(inputs[0], outputs[0]) 

concat = cat(inputs=['/tmp/hello*.txt'], outputs=['all_hellos.txt'])

# Get the filepath for the output file and open it
with open(concat.outputs[0].result(), 'r') as f:
 print(f.read())

# 2. Futures
When a Python function is invoked, the Python interpreter waits for the function to complete execution and returns the results. In case of long running functions it may not be desirable to wait for completion, instead it is often preferable that functions are asynchronous. Parsl provides such asynchronous behavior by returning a future in lieu of results. A future is essentially an object that allows us to track the status of an asynchronous task so that it may, in the future, be interrogated to find the status, results, exceptions, etc.

Parsl provides two types of futures: AppFutures and DataFutures. While related, these two types of futures enable subtly different workflow patterns, as we will see.



## AppFutures
AppFutures are the basic building block upon which Parsl scripts are built. Every invocation of a Parsl app returns an AppFuture which may be used to manage execution and control the workflow.

Here we show how AppFutures are used to wait for the result of a Python App.



In [None]:
# App that sleeps and then returns hello world
@App('python', dfk)
def hello ():
 import time
 time.sleep(5)
 return 'Hello World!'

app_future = hello()

# Check if the app_future is resolved
print ('Done: %s' % app_future.done())

# Print the result of the app_future. Note: this
# call will block and wait for the future to resolve
print ('Result: %s' % app_future.result())
print ('Done: %s' % app_future.done())

## DataFutures

While AppFutures represent the execution of an asynchronous app, the DataFuture represents the files it produces. Parsl’s dataflow model, in which data flows from one app to another via files, requires such a construct to enable apps to validate creation of required files and to subsequently resolve dependencies when input files are created. When invoking an app, Parsl requires that a list of output files be specified (using the outputs keyword argument). A DataFuture for each file is returned by the app when it is executed. Throughout execution of the app Parsl will monitor these files to 1) ensure they are created, and 2) pass them to any dependent apps.

In [None]:
# App that echos the input message to the first file specified in the
# outputs list
@App('bash', dfk)
def slowecho(message, outputs=[]):
 return 'sleep 5; echo %s &> {outputs[0]}' % (message)

# Call echo specifying the output file
hello = slowecho('Hello World!', outputs=['hello1.txt'])

# The AppFuture's outputs attribute is a list of DataFutures
print(hello.outputs)

# Also check the AppFuture
print ('Done: %s' % hello.done())

# Print the contents of the output DataFuture when complete
with open(hello.outputs[0].result(), 'r') as f:
 print(f.read())
 
# Now that this is complete, check the DataFutures again, and the Appfuture
print(hello.outputs)
print ('Done: %s' % hello.done())

# 3. Data Management

Parsl is designed to enable implementation of dataflow patterns. These patterns enable workflows to be defined in which the data passed between apps manages the flow of execution. Dataflow programming models are popular as they can cleanly express, via implicit parallelism, the concurrency needed by many applications in a simple and intuitive way.

## Files

Parsl’s file abstraction abstracts local access to a file. It therefore requires only the file path to be defined. Irrespective of where the script, or its apps are executed, Parsl uses this abstraction to access that file. When referencing a Parsl file in an app, Parsl maps the object to the appropriate access path.



In [None]:
from parsl.data_provider.files import File

# App that copies the contents of 1 or more files to another file
@App('bash', dfk)
def copy(inputs=[], outputs=[]):
 return 'cat %s &> %s' % (inputs[0], outputs[0])

# cCeate a test file
open('cat-in.txt', 'w').write('Hello World!\n')

# Create Parsl file objects
parsl_infile = File("cat-in.txt")
parsl_outfile = File("cat-out.txt")

# Call the copy app with the Parsl file
copy_future = copy(inputs=[parsl_infile], outputs=[parsl_outfile])

# Read what was redirected to the output file
with open(copy_future.outputs[0].result(), 'r') as f:
 print(f.read())

# 4. Composing a workflow

Now that we understand all the building blocks, we can create workflows with Parsl. Unlike other workflow systems, Parsl creates implicit workflows based on the passing of control or data between Apps. The flexibility of this model allows for the creation of a wide range of workflows from sequential through to complex nested, parallel workflows. As we will see below, a range of workflows can be created by passing AppFutures and DataFutures between Apps.


## Sequential workflow

Simple sequential or procedural workflows can be created by passing an AppFuture from one task to another. The following example shows one such workflow, which first generates a random number and then writes it to a file. 


In [None]:
# App that generates a random number
@App('python', dfk)
def generate(limit):
 from random import randint
 return randint(1,limit)

# App that writes a message to a file
@App('bash', dfk)
def save(message, outputs=[]):
 return 'echo %s &> {outputs[0]}' % (message)

# Generate the random number
message = generate(10)
print('Random number: %s' % message.result())

# Save the random number to a file
saved = save(message, outputs=['output.txt'])

# Print the output file
with open(saved.outputs[0].result(), 'r') as f:
 print('File contents: %s' % f.read())

## Parallel workflow

The most common way that Parsl Apps are executed in parallel is via looping. The following example shows how a simple loop can be used to create many random numbers in parallel.


In [None]:
# App that generates a random number
@App('python', dfk)
def generate(limit):
 from random import randint
 return randint(1,limit)

# Generate 5 random numers
rand_nums = []
for i in range(5):
 rand_nums.append(generate(10))

# Wait for all apps to finish and collect the results
outputs = [i.result() for i in rand_nums]

# Print results
print(outputs)

## Parallel dataflow

Parallel dataflows can be developed by passing data between Apps. In this example we create a set of files, each with a random number, we then concatenate these files into a single file and compute the sum of all numbers in that file. In the first two Apps files are exchanged. The final App returns the sum as a Python integer.



In [None]:
# App that generates a random number
@App('bash', dfk)
def generate(outputs=[]):
 return "echo $(( RANDOM )) &> {outputs[0]}"

# App that concatenates input files into a single output file
@App('bash', dfk)
def concat(inputs=[], outputs=[], stdout="stdout.txt", stderr='stderr.txt'):
 return "cat {0} > {1}".format(" ".join(inputs), outputs[0])

# App that calculates the sum of values in a list of input files
@App('python', dfk)
def total(inputs=[]):
 total = 0
 with open(inputs[0], 'r') as f:
 for l in f:
 total += int(l)
 return total

# Create 5 files with random numbers
output_files = []
for i in range (5):
 output_files.append(generate(outputs=['random-%s.txt' % i]))

# Concatenate the files into a single file
cc = concat(inputs=[i.outputs[0] for i in output_files], outputs=["all.txt"])

# Calculate the sum of the random numbers
total = total(inputs=[cc.outputs[0]])
print (total.result())

## Monte Carlo workflow

Many scientific applications use the [monte-carlo method](https://en.wikipedia.org/wiki/Monte_Carlo_method#History) to compute results. 

If a circle with radius $r$ is inscribed inside a square with side length $2r$ then the area of the circle is $\pi r^2$ and the area of the square is $(2r)^2$. Thus, if $N$ uniformly distributed random points are dropped within the suqare then approximately $N\pi/4$ will be inside the circle.

Each call to the function `pi()` is executed independently and in parallel. The `avg_three()` app is used to compute the average of the futures that were returned from the `pi()` calls.

The dependency chain looks like this:

```
App Calls pi() pi() pi()
 \ | /
Futures a b c
 \ | /
App Call avg_points()
 |
Future avg_pi
```

In [None]:
# App that estimates pi by placing points in a box
@App('python', dfk)
def pi(total):
 import random 
 
 # Set the size of the box (edge length) in which we drop random points
 edge_length = 10000
 center = edge_length / 2
 c2 = center ** 2
 count = 0
 
 for i in range(total):
 # Drop a random point in the box.
 x,y = random.randint(1, edge_length),random.randint(1, edge_length)
 # Count points within the circle
 if (x-center)**2 + (y-center)**2 < c2:
 count += 1
 
 return (count*4/total)

# App that computes the average of the values
@App('python', dfk)
def avg_points(a, b, c):
 return (a + b + c)/3

# Estimate three values for pi
a, b, c = pi(10**6), pi(10**6), pi(10**6)

# Compute the average of the three estimates
avg_pi = avg_points(a, b, c)

# Print the results
print("A: {0:.5f} B: {1:.5f} C: {2:.5f}".format(a.result(), b.result(), c.result()))
print("Average: {0:.5f}".format(avg_pi.result()))

# Execution and configuration

Parsl is designed to support arbitrary execution providers (e.g., PCs, clusters, supercomputers) and execution models (e.g., threads, pilot jobs, etc.). That is, Parsl scripts are independent of execution provider or executor. Instead, the configuration used to run the script tells Parsl how to execute apps on the desired environment. Parsl provides a high level abstraction, called a Block, for describing the resource configuration for a particular app or script.

Information about the different execution providers and executors supported is included in the [Parsl documentation](https://parsl.readthedocs.io/en/latest/userguide/execution.html).


## Local execution with threads

As we saw above, we can configure Parsl to execute apps on a local thread pool. This is a good way to parallelize execution on a local PC. The configuration object defines the sites that will be used for execution, optinally the authentication method to be used (e.g., if using SSH), and the execution model to use. In the case of threads we define the maximum number of threads to be used. A number of global configuration options may also be specified. 

In [None]:
threads_config = {
 "sites" : [
 { "site" : "Local_Threads",
 "auth" : { "channel" : None },
 "execution" : {
 "executor" : "threads",
 "provider" : None,
 "maxThreads" : 4
 }
 }],
 "globals" : {"lazyErrors" : True}
}

## Local execution with pilot jobs

We can also define a configuration that uses IPythonParallel as the executor. In this mode, pilot jobs are used to manage the submission. Parsl creates an IPythonParallel controller to manage execution and deploys one or more IPythonParallel engines (workers) to execute workload. The following config will instantiate this infrastructure locally, it can be trivially extended to include a remote provider (e.g., Cori, Theta, etc.) for execution. 

In [None]:
ipp_config = {
 "sites" : [{
 "site" : "Local_IPP",
 "auth" : {
 "channel" : "local"
 },
 "execution" : {
 "executor" : "ipp",
 "provider" : "local",
 "script_dir" : ".scripts",
 "scriptDir" : ".scripts",
 "block" : {
 "nodes" : 1,
 "taskBlocks" : 1,
 "walltime" : "00:05:00",
 "initBlocks" : 1,
 "minBlocks" : 1,
 "maxBlocks" : 1,
 "scriptDir" : ".",
 "options" : {
 "partition" : "debug"
 }
 }
 }
 }],
 "globals" : { "lazyErrors" : True },
}

## Running a workflow using a configuration

We can now run the same workflow using either of the two configurations defined above. Change which config is used to instantiate the DFK to see the same workflow executed with different models. 

In [None]:
import parsl
from parsl import *
# parsl.set_stream_logger() # <-- log everything to stdout
print(parsl.__version__)

#dfk = DataFlowKernel(config=threads_config)
dfk = DataFlowKernel(config=ipp_config)

@App('bash', dfk)
def generate(outputs=[]):
 return "echo $(( RANDOM )) &> {outputs[0]}"

@App('bash', dfk)
def concat(inputs=[], outputs=[], stdout="stdout.txt", stderr='stderr.txt'):
 return "cat {0} > {1}".format(" ".join(inputs), outputs[0])

@App('python', dfk)
def total(inputs=[]):
 total = 0
 with open(inputs[0], 'r') as f:
 for l in f:
 total += int(l)
 return total

# Create 5 files with random numbers
output_files = []
for i in range (5):
 output_files.append(generate(outputs=['random-%s.txt' % i]))

# Concatenate the files into a single file
cc = concat(inputs=[i.outputs[0].filepath for i in output_files], 
 outputs=["combined.txt"])

# Calculate the sum of the random numbers
total = total(inputs=[cc.outputs[0]])

print (total.result())
dfk.cleanup()