In [62]:
# early imports
import IPython
import numpy as np

# setup notebook extensions
!jupyter nbextension enable splitcell/splitcell
!jupyter nbextension enable rise/main

Enabling notebook extension splitcell/splitcell...
      - Validating: [32mOK[0m
Enabling notebook extension rise/main...
      - Validating: [32mOK[0m


![law logo](https://raw.githubusercontent.com/riga/law/master/logo.png)
[github.com/riga/law](https://github.com/riga/law)

*Design Pattern for Analysis Automation on Interchangeable,<br>
Distributed Resources using Luigi Analysis Workflows*

## A typical high-energy physics analyses

#### Search for ttH (H → bb) production ([1804.03682](https://arxiv.org/abs/1804.03682))

- ~500k different *tasks* to process
- Processing on the WLCG, HTCondor batch systems, GPU clusters, local machines
- O(100TB) *user* storage across several sites, local storage, DropBox
- Complex data flow due to sophisticated analyses techniques and requirements
    - BDTs, DNNs, Matrix Element Method
    - ~80 systematic variations
- Must be operable by everyone any time

<center>
    <img src="images/tth.png" style="margin-top: 10%;" />
    <img src="images/dnn.png" />
</center>

## Landscape of HEP analyses

#### Metrics to describe analyses

- Scale: Measure of resource consumption and amount of data to be processed
    
- Complexity: Measure of granularity and inhomogeneity of workloads

<center>
    <img src="images/scale_complexity.png" width="500">
</center>

- Todays (and definitely future) analyses tend to be **both** large-scale **and** complex:
    - Undocumented requirements between workloads, only exist in the physicist’s head
    - Manual execution & steering of jobs, bookkeeping of data, ...
    - Error-prone & time-consuming

<center>
    <img src="images/ttbb_workflow.png">
</center>

<center>
    <img src="images/ttbb_workflow_zoom.png">
</center>

## Motivational questions

- Does the analysis depend on *where* it runs,<br> or *where* it stores data?<br>
**→ Decisions should not dictate code design!**

- When a Postdoc / PhD student leaves,<br> can someone else run the analysis,<br> is a new *framework* required?<br>
**→ Workflow often not documented, <br> only exists in the physicists head!**

- After an analysis is published, are people investing time to preserve their work,<br> can it be repeated after O(w/m/y)?<br>
**→ Daily working environment must provide preservation features out-of-the-box!**

<center>
    <img src="images/remote.png">
    <img src="images/lost.png">
    <img src="images/preservation.png">
</center>

## <a href="https://github.com/spotify/luigi"><span style='color:#63cc10;'>luigi</span></a>

- Python package for building complex pipelines
- Development started at Spotify (2012),<br > now open-source and community driven

#### Simple building blocks

1. Workloads defined as <span style="color:blue; font-family:monospace;">Tasks</span>
2. <span style="color:blue; font-family:monospace;">Tasks</span> **require** other <span style="color:blue; font-family:monospace;">Tasks</span>
3. <span style="color:blue; font-family:monospace;">Tasks</span> **output** <span style="color:green; font-family:monospace;">Targets</span>
4. <span style="color:red; font-family:monospace;">Parameters</span> customize behavior

#### Benefits

- Only processes what is really necessary
- Error handling & automatic re-scheduling

<center>
    <img src="https://raw.githubusercontent.com/spotify/luigi/master/doc/luigi.png" width="250">
    <img src="images/scheduler.png">
</center>

## <span style='color:#63cc10;'>luigi</span> in a nutshell

In [3]:
# analysis.py

import luigi

class Inference(luigi.Task):
    
    split = luigi.ChoiceParameter(default="test", choices=("test", "valid", "train"))
    # parameters are translated into command-line interface arguments
    
    def requires(self):
        from mva import MVAEvaluation
        return MVAEvaluation(split=self.split)  # pass parameters upstream
    
    def output(self):
        return luigi.LocalTarget(f"data_{self.split}.h5")  # encode *significant* parameters into output path
    
    def run(self):
        outp = self.output()  # this is the LocalTarget from above
        inp = self.input()    # this is output() of MVAEvaluation
        
        do_whathever_an_inference_does(inp.path, outp.path)

## `make`-like execution system

```shell
luigi --module analysis Inference \
    --split test \
    --other-parameters ...
```

#### What <span style='color:#63cc10;'>luigi</span> does

1. Create dependency tree for triggered task
2. Determine tasks to actually run:
    - Top-down tree traversal
    - Consider a task **complete** when all of its output ``Target``'s exist
3. Run tree with configurable number of workers

<center>
    <img src="images/example_tree1.png">
</center>

## Work of a B.Sc. student after 2 weeks 

<center>
    <img src="images/example_tree2.png">
</center>

# law: <span style='color:#63cc10;'>luigi</span> <span style='color:#3c69a1;'>analysis workflow</span> package

- <span style='color:#63cc10;'>luigi</span> is a perfect tool to model complex workflows, simple structure, easy to extend

- <span style='color:#63cc10;'>l</span><span style='color:#3c69a1;'>aw</span> *extends* <span style='color:#63cc10;'>luigi</span> (i.e. it does not replace it)

- **Main goal**: decouple algorithm code from<br> 1. *run locations*,<br> 2. *storage locations*, and<br> 3. *software environments*

- Provides a toolbox to follow a design pattern
- No constraints on data format, language, ...
- No fixation on dedicated resources
- Not a *framework*

<center>
    <img src="https://raw.githubusercontent.com/riga/law/master/logo.png"><br>
    <img src="images/workload.png" width="45%">
</center>

## Example task: multiply input parameter by 2

Before we start, import <span style='color:#63cc10;'>luigi</span> and <span style='color:#63cc10;'>l</span><span style='color:#3c69a1;'>aw</span>, and load IPython magics to execute tasks from within notebooks:

In [69]:
# basic imports
import luigi
import law
import json

# load law ipython magics
law.contrib.load("ipython")
law.ipython.register_magics(log_level="INFO")
# drop-in replacement for base task with some interactive features
Task = law.ipython.Task

[0;49;32mINFO[0m: law.contrib.ipython.magic - magics successfully registered: %law, %ilaw


IPython magics:

- `%ilaw` runs a task inside the current session
- `%law` runs a task as a subprocess (not used in this notebook)

## Example task: multiply an input parameter by 2

In [46]:
class TimesTwo(Task):

    n = luigi.IntParameter()  # no default!
    
    def output(self):
        return law.LocalFileTarget(
            f"data/n{self.n}.json")
    
    def run(self):        
        # method 1: the verbose way
        output = self.output()
        output.parent.touch()  # creates the data/ dir

        # define data to save
        # note: self.n is the value of the "n" parameter
        # and != self.__class__.n (parameter instance!)
        data = {"in": self.n, "out": self.n * 2}

        # pythonic way to save data
        with open(output.path, "w") as f:
            json.dump(data, f, indent=4)

In [None]:
%ilaw run TimesTwo --n 5

#### Inspect results with `--print-status <tree_depth>`

In [None]:
%ilaw run TimesTwo --n 5 --print-status 0

In [None]:
!cat data/n5.json

#### Remove results with `--remove-output <tree_depth>`

In [None]:
%ilaw run TimesTwo --n 5 --remove-output 0

In [None]:
!cat data/n5.json

## Example task: multiply an input parameter by 2, refactored

In [53]:
class TimesTwo(Task):

    n = luigi.IntParameter()  # no default!
    
    def output(self):
        return law.LocalFileTarget(
            f"data/n{self.n}.json")
    
    def run(self):
        # method 1: using target *formatters*
        data = {"in": self.n, "out": self.n * 2}
        self.output().dump(data, formatter="json",
                           indent=4)
        
        # all arguments passed to "dump" implementation

        # variety of available formatters: yaml, numpy,
        # h5py, root, matplotlib, tensorflow, keras,
        # coffea, zip, tar, pickle, ...

In [None]:
%ilaw run TimesTwo --n 6

## Abstract storage: <span style="color:#589af7;">remote targets</span>

- Idea: work with remote files / directories<br> as if they were local

- Remote `Target`s based on GFAL2 Python bindings, supports all WLCG protocols (dCache, XRootD, GridFTP, SRM, ...) + DropBox

- Implement **identical** target API

- Automatic retries

- Round-robin (over different doors)

- Local caching

<center>
    <img src="images/workload.png" width="50%">
</center>

## Example: DropBox targets

<br>

Configure DropBox access in law.cfg:

```
[dropbox]

base: dropbox://dropbox.com/my_dir
app_key: ...
app_secret: ...
access_token: ...
```

In [None]:
# load the dropbox and numpy contrib packages
law.contrib.load("dropbox", "numpy")

# define a directory
my_dir = law.dropbox.DropboxFileTarget("/")

# save a numpy array in a new file
my_file = my_dir.child("data.npz", type="f")
my_file.dump(np.zeros((10, 20)), formatter="numpy")

# directory listing
my_dir.listdir()  # -> ["data.npz"]

# load the data again
zeros = my_file.load(formatter="numpy")

# play around with objects
my_file.parent == my_dir  # -> True

my_dir.child("other.txt", type="f").touch()
my_file.sibling("other.txt").exists()  # > True

See [examples/dropbox_targets](https://github.com/riga/law/tree/master/examples/dropbox_targets) for examples and more infos on configuring access to your DropBox.

## Abstract job interface: <span style="color:#f38f23;">remote workflows</span>

- Idea: submission built into tasks, **no need** to write extra code

- Currently supported job systems:<br>
HTCondor, LSF, gLite, ARC, (Slurm)

- Automatic resubmission

- Full job control (# tasks per job, # parallel jobs, # of job retries, early stopping, ... )

- Dashboard interface

<center>
    <img src="images/workload.png" width="50%">
</center>

## From the [htcondor_at_cern](https://github.com/riga/law/tree/master/examples/htcondor_at_cern) example

A `Workflow` task has **branches**, defined in `create_branch_map`:

- Add `--branch <n>` to the command line to run a *certain branch* locally
- Leave it empty to run **all** branches at a configurable location (local, HTCondor, ...)

The task to the right has 26 branches (branch 0 to 25), with each branch writing one character of the alphabet into a file.

In [None]:
class CreateChars(Task, law.LocalWorkflow, HTCondorWorkflow):

    def create_branch_map(self):
        # map branch numbers 0-25 to
        # ascii numbers 97-122 (= a-z)
        return {
            i: num for i, num in
            enumerate(range(97, 123))
        }

    def output(self):
        return law.LocalFileTarget(
            f"output_{self.branch}.json")

    def run(self):
        # branch_data holds the integer number to convert
        char = chr(self.branch_data)

        # write the output
        self.output().dump({"char": char})

- `law run CreateChars --branch 0` would write `{"char": "a"}` into `output_0.json`.
- `law run CreateChars` would run all branches locally.

## Select htcondor at execution time

![](images/htcondor.png)

Checkout [examples/htcondor_at_cern](https://github.com/riga/law/tree/master/examples/htcondor_at_cern).

## Abstract software environments: <span style="color:#69ba3a;">sandboxes</span>

- Diverging software requirements between workloads is a great feature / challenge / problem

- Introduce sandboxing:<br> "Run entire task in different environment"

- Existing sandbox implementations:
    - Sub-shell with init file
    - Docker images
    - Singularity images
    
<br>
<br>
<center>
    <img src="images/singularity_docker.png">
</center>

<center>
    <img src="images/workload.png" width="35%">
    <img src="images/example_tree3.png" width="50%">
</center>

## Example: Bash sandboxes

In [79]:
import os

class SandboxExample(Task, law.SandboxTask):
    
    sandbox = "bash::test_env1.sh"
    
    # for docker container, use
    # sandbox = "docker::image_name"

    # for singularity container, use
    # sandbox = "singularity::image_name"
    
    def output(self):
        return law.LocalFileTarget("data/sandbox_variable.txt")

    # the run method is encapsulated 
    def run(self):
        value = os.getenv("MY_VARIABLE")

        print(f"MY_VARIABLE: {value}")
        
        self.output().dump(value, formatter="text")
        

!cat test_env1.sh

#!/usr/bin/env bash

export MY_VARIABLE="foo"


In [78]:
%ilaw run SandboxExample
# mockup output, please run via command line :)

[0;49;32mINFO[0m: luigi-interface - Informed scheduler that task   SandboxExample__99914b932b   has status   PENDING
[0;49;32mINFO[0m: luigi-interface - Done scheduling tasks
[0;49;32mINFO[0m: luigi-interface - Running Worker with 1 processes
[0;49;32mINFO[0m: luigi-interface - [pid 3238510] Worker Worker(salt=533918042, workers=1, host=lxplus806.cern.ch, username=mrieger, pid=3238510) running   SandboxExample()
[0;49;32mINFO[0m: luigi-interface - [pid 3238510] Worker Worker(salt=533918042, workers=1, host=lxplus806.cern.ch, username=mrieger, pid=3238510) done      SandboxExample()
[0;49;32mINFO[0m: luigi-interface - Informed scheduler that task   SandboxExample__99914b932b   has status   DONE
[0;49;32mINFO[0m: luigi-interface - Worker Worker(salt=533918042, workers=1, host=lxplus806.cern.ch, username=mrieger, pid=3238510) was stopped. Shutting down Keep-Alive thread
[0;49;32mINFO[0m: luigi-interface - 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 

sandbox: bash::test_env1.sh
task   : SandboxExample__99914b932b

MY_VARIABLE: foo

sandbox: bash::test_env1.sh
task   : SandboxExample__99914b932b


## Summary

- Many HEP analyses already / likely to increase in scale and complexity
- Resource-agnostic workflow management and automation essential
- **Need for toolbox providing a design pattern, not another framework**

- <span style='color:#63cc10;'>luigi</span> is able to model even complex workflows
- <span style='color:#63cc10;'>l</span><span style='color:#3c69a1;'>aw</span> provides convenience & scalability on HEP infrastructure

- All information transparently encoded in tasks, targets & dependencies
- **Full automation** of end-to-end analyses

## Links

#### law
- Repository: [github.com/riga/law](https://github.com/riga/law)
- Paper: [arXiv:1706.00955](https://arxiv.org/abs/1706.00955) (CHEP proceedings)
- Documentation: [law.readthedocs.io](https://law.readthedocs.io) (in progress)
- Minimal example: [github.com/riga/law/tree/master/examples/loremipsum](https://github.com/riga/law/tree/master/examples/loremipsum)
- HTCondor example: [github.com/riga/law/tree/master/examples/htcondor_at_cern](https://github.com/riga/law/tree/master/examples/htcondor_at_cern)

#### luigi
- Repository: [github.com/spotify/luigi](https://github.com/spotify/luigi)
- Documentation: [luigi.readthedocs.io](https://luigi.readthedocs.io)
- "Hello world": [github.com/spotify/luigi/blob/master/examples/hello_world.py](https://github.com/spotify/luigi/blob/master/examples/hello_world.py)