## Multiprocessing

Threads and processes

Multithreading in Python is split into two groups: multithreading and multiprocessing.

Multithreading means you have one Python process. Due to the way that Python is implemented with Global Interpreter Lock (GIL), you can only run one Python instruction at a time, even from multiple threads. This is very limiting, but not the end of the world for multithreading. One loophole is that this only is valid for *Python* instructions; as long as they don't change Python's internal memory model (like changing refcounts), *compiled* code is allowed to escape the GIL. This include JIT code like Numba!

The other method is multiprocessing. This involves creating two or more Python *processes*, with their own memory space, then either transferring data (via Pickle) or by sharing selected portions of memory (less common before Python 3.8, but possible). This is much heaver-weight than threading, but can be used effectively.

#### Built-in libraries

* **Thread**: The core low-level threading library.
* **Threading**: A basic interface to thread, still rather low-level by modern standards.
* **Multiprocessing**: Similar to threading, but with processes. Shared memory tools added in Python 3.8.
* **Concurrent.futures:** Higher-level interface to both threading and multiprocessing. Introduced in Python 3.3 and backported in PyPI.
* **Ascynio:** Explicit control over switching points.

In [None]:
import asyncio
import concurrent.futures
import sys # noqa: F401
import threading
import time

import matplotlib.pyplot as plt
import numpy as np

In [None]:
def prepare(height, width):
 c = np.sum(
 np.broadcast_arrays(*np.ogrid[-1j : 0j : height * 1j, -1.5 : 0 : width * 1j]),
 axis=0,
 )
 fractal = np.zeros_like(c, dtype=np.int32)
 return c, fractal


def run(c, fractal, maxiterations=20):
 z = c

 for i in range(1, maxiterations + 1):
 z = z**2 + c # Compute z
 diverge = abs(z) > 2 # Divergence criteria

 z[diverge] = 2 # Keep number size small
 fractal[~diverge] = i # Fill in non-diverged iteration number

 return fractal

In [None]:
size = 4000, 3000

In [None]:
c, fractal = prepare(*size)

In [None]:
%%time
fractal = run(c, fractal)

In [None]:
fig, ax = plt.subplots(figsize=(12, 8))
ax.imshow(fractal);

## Raw threading

In [None]:
import threading

c, fractal = prepare(*size)


def piece(i):
 ci = c[10 * i : 10 * (i + 1), :]
 fi = fractal[10 * i : 10 * (i + 1), :]
 run(ci, fi)


workers = []
for i in range(size[0] // 10):
 workers.append(threading.Thread(target=piece, args=(i,)))

Note: You can also use the OO interface, which I sometimes prefer.

```python
class Worker(threading.Thread):
 def __init__(self, c, fractal, i):
 super(Worker, self).__init__()
 self.c = c
 self.fractal = fractal
 self.i = i
 def run(self):
 run(self.c[10*self.i : 10*(self.i + 1), :], self.fractal[10*self.i : 10*(self.i + 1), :])

workers = []
for i in range(size[0]//10):
 workers.append(Worker(c, fractal, i))
```

In [None]:
%%time
for worker in workers:
 worker.start()
for worker in workers:
 worker.join()

In [None]:
len(workers)

In [None]:
fig, ax = plt.subplots(figsize=(12, 8))
ax.imshow(fractal);

## High level: Executors

Python 3 introduced an "executor" interface that manages workers for you. Instead of creating threads or processes with a `run` method, you create an executor and send work to it. It has the same interface for threads and processes.

In [None]:
executor = concurrent.futures.ThreadPoolExecutor(max_workers=8)

In [None]:
def piece(i):
 ci = c[10 * i : 10 * (i + 1), :]
 fi = fractal[10 * i : 10 * (i + 1), :]
 run(ci, fi)


c, fractal = prepare(*size)

In [None]:
%%time
futures = executor.map(piece, range(size[0] // 10))
for _ in futures: # iterating over them waits for the results
 pass

In [None]:
fig, ax = plt.subplots(figsize=(12, 8))
ax.imshow(fractal);

## Async: no threads at all

Python has native support for async functions, which don't use threads at all, and were originally even implemented on top of generators. This is a control scheme that makes all suspension points explicit, using the "await" keyword. This is much easier to program multiple threads sharing data, since you can see the await points; you are running in a single thread except for being able to temporarily give control elsewhere when "await" is present.

This _can_, however, be integrated with threading, and can take advantage of functions that release the GIL.

In [None]:
async def compute_async():
 await asyncio.gather(*(asyncio.to_thread(piece, x) for x in range(size[0] // 10)))

In [None]:
c, fractal = prepare(*size)

IPython supports top-level await, but it does not support it in some magics, including the time related magics. So we'll replace the time magic ourselves.

In [None]:
start = time.time()
await compute_async()
print(time.time() - start)

In [None]:
fig, ax = plt.subplots(figsize=(12, 8))
ax.imshow(fractal);

## Shared Memory and Multiprocessing

It's often difficult to get "perfect scaling," N times more work from N threads, in real situations. Even though this problem is "embarrassingly parallel" (none of the workers need to know other workers' results), there can be scheduling overhead, contention for memory, or slow-downs due to Python's [Global Interpreter Lock](https://realpython.com/python-gil/).

One way to avoid the global interpreter lock is to send work to separate processes. Python interpreters in separate processes do not share memory and therefore do not need to coordinate.

However, that means that we can't send data by simply sharing variables. We have to send it through a `multiprocessing.Queue` (which serializes— pickles— the data so that it can go through a pipe). In Python 3.8, we have the new `multiprocessing.shared_memory` module!

You can share arrays among processes if you declare them as shared memory before launching the subprocesses. Python has a special type for this:

In [None]:
%%writefile multiproc.py

import concurrent.futures
import multiprocessing.shared_memory
import time

import numpy as np

size = 4000, 3000


def prepare(height, width):
 c = np.sum(
 np.broadcast_arrays(*np.ogrid[-1j : 0j : height * 1j, -1.5 : 0 : width * 1j]),
 axis=0,
 )
 fractal = np.zeros_like(c, dtype=np.int32)
 return c, fractal


def run(c, fractal, maxiterations=20):
 z = c

 for i in range(1, maxiterations + 1):
 z = z**2 + c # Compute z
 diverge = abs(z) > 2 # Divergence criteria

 z[diverge] = 2 # Keep number size small
 fractal[~diverge] = i # Fill in non-diverged iteration number

 return fractal


c, orig_fractal = prepare(*size)


def piece(i):
 mem = multiprocessing.shared_memory.SharedMemory(name="perfdistnumpy")
 fractal = np.ndarray(shape=c.shape, dtype=np.int32, buffer=mem.buf)

 ci = c[10 * i : 10 * (i + 1), :]
 fi = fractal[10 * i : 10 * (i + 1), :]
 run(ci, fi)
 mem.close()


if __name__ == "__main__":
 d_size = np.int32().itemsize * np.prod(orig_fractal.size)

 mem = multiprocessing.shared_memory.SharedMemory(
 name="perfdistnumpy", create=True, size=d_size
 )
 try:
 fractal = np.ndarray(shape=c.shape, dtype=np.int32, buffer=mem.buf)
 fractal[...] = orig_fractal

 executor = concurrent.futures.ProcessPoolExecutor(max_workers=8)

 start = time.time()
 futures = executor.map(piece, range(size[0] // 10))
 for _ in futures: # iterating over them waits for the results
 pass
 print(time.time() - start, "seconds")
 print(fractal)
 finally:
 mem.close()
 mem.unlink()

This is shared across processes and can enen outlive the owning process, so make _sure_ you close (per process) and unlink (once) the memory you take! Having a fixed name (like above) can be safer.

In [None]:
!{sys.executable} multiproc.py

## Dask (external packages)

Now let's try an external package: Dask.

Still, there needs to be a better way. Our array slices in `piece` are fragile: an indexing error can ruin the result. Can't the problem of scattering work be generalized?

In [None]:
import dask.array

In [None]:
c, fractal = prepare(*size)

c = dask.array.from_array(c, chunks=(10, size[1]))
fractal = dask.array.from_array(fractal, chunks=(10, size[1]))

In [None]:
%%time
fractal = run(c, fractal)

That's AWESOME! So fast for so little. Let's take a look!

In [None]:
fractal

What the heck? This is not an array: it is a description of how to make an array. Dask has stepped through our procedure and built an execution graph, encoding all the dependencies so that it can correctly apply it to individual chunks. When we execute this graph, Dask will send a chunk to each processor in the computer and combine results.

In [None]:
%%time
fractal = fractal.compute()

In [None]:
fig, ax = plt.subplots(figsize=(12, 8))
ax.imshow(fractal);

Much better and less exciting. Dask now did our chunking for us. It's not any faster than single threaded, though; we need to point it to a scheduler/worker. This could be local, or it could be many computers anywhere in the world!

We seem to have paid for this simplicity: it took twice as long as the carefully sliced `pieces` in the executor.

The reason is that our code is not as simple as it looks. It has masking and piecemeal assignments, which in principle could introduce complex dependencies. _We_ know that everything will be fine if you just chop up the array in independent sections— and thus we implemented our thread and executor-based solutions that way.

Let me show you what Dask has to do for a 1×1 chunking of our problem.

In [None]:
c, fractal = prepare(1, 1) # try 2, 2
c = dask.array.from_array(c, chunks=(1, 1))
fractal = dask.array.from_array(fractal, chunks=(1, 1))
fractal = run(c, fractal, maxiterations=1) # try more iterations
fractal.visualize()

If that were all, I'd probably stick to chopping up the grid by hand (when possible). However, _exactly the same interface_ that distributes work across cores in my laptop can distribute work around the world, just by pointing it to a remote scheduler.

This is truly the ~~lazy~~ busy researcher approach!

> Note to self: launch
> 
> ```bash
> dask-scheduler &
> dask-worker --nthreads 8 127.0.0.1:8786 &
> ```
> 
> in a terminal now.

In [None]:
import dask.distributed

client = dask.distributed.Client("127.0.0.1:8786")
client

In [None]:
c, fractal = prepare(*size)

c = dask.array.from_array(c, chunks=(100, size[1]))
fractal = dask.array.from_array(fractal, chunks=(100, size[1]))
fractal = run(c, fractal)

In [None]:
%%time
fractal = client.compute(fractal, sync=True)

Well, that was exciting!

In the end, this example took longer than the single-core version, but it illustrates how array operations _can be_ distributed in a simple way.

I haven't shown very much of what Dask can do. It's a general toolkit for delayed and distributed evaluation. As such, it provides a nice way to work on Pandas-like DataFrames that are too large for memory:

In [None]:
import dask.dataframe

df = dask.dataframe.read_csv("data/nasa-exoplanets.csv")
df

We don't see the data because they haven't been loaded. But we can get them if we need them.

In [None]:
df[["pl_hostname", "pl_pnum"]].compute()

Additionally, Dask isn't the only project filling this need. There's also:

 * **Joblib:** annotate functions to execute remotely with decorators.
 * **Parsl:** same, but work with conventional schedulers (Condor, Slurm, GRID); an academic project.
 * **PySpark:** Spark is a big, scalable project, though its Python interface has performance issues.

and many smaller projects.

(Distributed computing hasn't been fully figured out yet.)