# Cluster API

IPython Parallel 7 adds a `Cluster` API for starting/stopping clusters.

This is the new implementation of `ipcluster`,
which can be more easily re-used in Python programs.
The `ipcluster` script is

Controllers and Engines are started with "Launchers",
which are objects representing a running process.

Each **Cluster** has:

- a **cluster id**
- a **profile directory**
- one **controller**
- zero or more **engine sets**
 - each of which has one or more **engines**
 
The combination of `profile_dir` and `cluster_id` uniquely identifies a cluster.
You can have many clusters in one profile, but each must have a distinct cluster id.

To create a cluster, instantiate a Cluster object:

In [1]:
from ipyparallel import Cluster

cluster = Cluster()
cluster



To start the cluster:

In [2]:
await cluster.start_controller()
cluster

)>

In [3]:
engine_set_id = await cluster.start_engines(n=4)
cluster

Starting 4 engines with 


, engine_sets=['1623263481-w75s'])>

As you can see, all methods on the Cluster object are async by default.
Every async method also has a `_sync` variant, if you don't want to / can't use asyncio.

In [4]:
engine_set_2 = cluster.start_engines_sync(n=2)
engine_set_2

Starting 2 engines with 


'1623263483-iafz'

At this point, we have a cluster with a controller and six engines in two groups.

There is also a `start_cluster` method that starts the controller and one engine set, for convenience:

```python
engine_set_id = await cluster.start_cluster(n=4)
```

We can get a client object connected to the cluster with `connect_client()`

In [5]:
rc = cluster.connect_client()
rc.wait_for_engines(6)
rc.ids

[0, 1, 2, 3, 4, 5]

And we can use our classic `apply_async(...).get_dict()` pattern to get a dict by engine id of hostname, pid for each engine:

In [6]:
def identify():
 import os
 import socket

 return {"host": socket.gethostname(), "pid": os.getpid()}


rc[:].apply_async(identify).get_dict()

{0: {'host': 'touchy', 'pid': 81944},
 1: {'host': 'touchy', 'pid': 81945},
 2: {'host': 'touchy', 'pid': 81946},
 3: {'host': 'touchy', 'pid': 81947},
 4: {'host': 'touchy', 'pid': 81952},
 5: {'host': 'touchy', 'pid': 81953}}

We can send signals to engine sets by id

*(sending signals to just one engine is still a work in progress)*

In [7]:
import signal
import time

ar = rc[:].apply_async(time.sleep, 100)

In [8]:
# oops! I meant 1!

await cluster.signal_engines(signal.SIGINT)
ar.get()

Sending signal 2 to engine(s) 1623263481-w75s
Sending signal 2 to engine(s) 1623263483-iafz


CompositeError: one or more exceptions from call to method: sleep
[0:apply]: KeyboardInterrupt: 
[1:apply]: KeyboardInterrupt: 
[2:apply]: KeyboardInterrupt: 
[3:apply]: KeyboardInterrupt: 
.... 2 more exceptions ...

Now it's time to cleanup. Every `start_` method has a correspinding `stop_method`.

We can stop one engine set at a time with `stop_engines`:

In [9]:
await cluster.stop_engines(engine_set_2)

Stopping engine(s): 1623263483-iafz


Or stop the whole cluster

In [10]:
await cluster.stop_cluster()

Stopping engine(s): 1623263481-w75s
Stopping controller
Controller stopped: {'exit_code': 0, 'pid': 81906}


## Cluster as a context manager

Cluster can also be used as a Context manager,
in which case:

1. entering the context manager starts the cluster
2. the `as` returns a connected client
3. the context is only entered when all the engines are fully registered and available
4. when the context exits, the cluster is torn down

This makes it a lot easier to scope an IPython cluster for the duration of a computation
and ensure that it is cleaned up when you are done.

In [11]:
import os

with Cluster(n=4) as rc:
 engine_pids = rc[:].apply_async(os.getpid).get_dict()
engine_pids

Starting 4 engines with 
Waiting for connection file: ~/.ipython/profile_default/security/ipcontroller-touchy-1623263508-mdel-client.json
Stopping engine(s): 1623263508-5i4g
Stopping controller


{0: 82284, 1: 82282, 2: 82283, 3: 82285}

Controller stopped: {'exit_code': 0, 'pid': 82281}


It can also be async

In [12]:
async with Cluster(n=2) as rc:
 engine_pids = rc[:].apply_async(os.getpid).get_dict()
engine_pids

Starting 2 engines with 
Waiting for connection file: ~/.ipython/profile_default/security/ipcontroller-touchy-1623263514-nqk6-client.json
Stopping engine(s): 1623263514-b9f9
Stopping controller


{0: 82407, 1: 82408}

Controller stopped: {'exit_code': 0, 'pid': 82406}


## Launcher classes

IPython's mechanism for launching controllers and engines is called `Launchers`.
These are in `ipyparallel.cluster.launcher`.

There are two kinds of Launcher:

- ControllerLauncher, which starts a controller
- EngineSetLauncher, which starts `n` engines

You can use abbreviations to access the launchers that ship with IPython parallel,
such as 'MPI', 'Local', or 'SGE',
or you can pass classes themselves (or their import strings, such as 'mymodule.MyEngineSetLauncher').

I'm going to start a cluster with engines using MPI:

In [13]:
cluster = Cluster(n=4, engine_launcher_class='MPI')
await cluster.start_cluster()
rc = cluster.connect_client()

Starting 4 engines with 


In [14]:
rc.wait_for_engines(4)
rc.ids

[0, 1, 2, 3]

Now I'm going to run a test with another new feature

In [15]:
def uhoh():
 import time
 from mpi4py import MPI

 rank = MPI.COMM_WORLD.rank
 if rank == 0:
 print("rank 0: oh no.")
 1 / 0
 print(f"rank {rank}: barrier")
 MPI.COMM_WORLD.barrier()


ar = rc[:].apply_async(uhoh)
ar.get(timeout=2)

TimeoutError: Result not ready.

Uh oh! We are stuck in barrier because engine 0 failed.

Let's try interrupting and getting the errors:

In [16]:
import signal
await cluster.signal_engines(signal.SIGINT)
ar.get(timeout=2)

Sending signal 2 to engine(s) 1623263525-cijg


TimeoutError: Result not ready.

It didn't work! This is because MPI.barrier isn't actually interruptible 😢.

We are going to have to resort to more drastic measures, and *restart* the engines:

In [17]:
await cluster.restart_engines()

Stopping engine(s): 1623263525-cijg
Starting 4 engines with 
engine set stopped 1623263525-cijg: {'exit_code': -9, 'pid': 82790}


In [19]:
rc.wait_for_engines(4)
rc.ids

[4, 5, 6, 7]

We are now back to having 4 responsive engines.
Their IPP engine id may have changed, but I can get back to using them.

In [20]:
def get_rank():
 from mpi4py import MPI

 return MPI.COMM_WORLD.rank

rank_map = rc[:].apply_async(get_rank).get_dict()
rank_map

{4: 0, 5: 3, 6: 1, 7: 2}

Finally, clean everything up

In [21]:
await cluster.stop_cluster()

Stopping engine(s): 1623263525-cijg
Stopping controller
Controller stopped: {'exit_code': 0, 'pid': 82767}
engine set stopped 1623263525-cijg: {'exit_code': 1, 'pid': 82998}
