In [None]:
#default_exp parallel

In [None]:
#export
from fastcore.imports import *
from fastcore.foundation import *
from fastcore.basics import *
from fastcore.xtras import *
from functools import wraps

# from contextlib import contextmanager,ExitStack
from multiprocessing import Process, Queue
import concurrent.futures,time
from multiprocessing import Manager
from threading import Thread

In [None]:
from fastcore.test import *
from nbdev.showdoc import *
from fastcore.nb_imports import *

# Parallel

> Threading and multiprocessing functions

In [None]:
#export
def threaded(f):
    "Run `f` in a thread, and returns the thread"
    @wraps(f)
    def _f(*args, **kwargs):
        res = Thread(target=f, args=args, kwargs=kwargs)
        res.start()
        return res
    return _f

In [None]:
@threaded
def _1():
    time.sleep(0.05)
    print("second")

@threaded
def _2():
    time.sleep(0.01)
    print("first")

_1()
_2()
time.sleep(0.1)

first
second


In [None]:
#export
def startthread(f):
    "Like `threaded`, but start thread immediately"
    threaded(f)()

In [None]:
@startthread
def _():
    time.sleep(0.05)
    print("second")

@startthread
def _():
    time.sleep(0.01)
    print("first")

time.sleep(0.1)

first
second


In [None]:
#export
def set_num_threads(nt):
    "Get numpy (and others) to use `nt` threads"
    try: import mkl; mkl.set_num_threads(nt)
    except: pass
    try: import torch; torch.set_num_threads(nt)
    except: pass
    os.environ['IPC_ENABLE']='1'
    for o in ['OPENBLAS_NUM_THREADS','NUMEXPR_NUM_THREADS','OMP_NUM_THREADS','MKL_NUM_THREADS']:
        os.environ[o] = str(nt)

This sets the number of threads consistently for many tools, by:

1. Set the following environment variables equal to `nt`: `OPENBLAS_NUM_THREADS`,`NUMEXPR_NUM_THREADS`,`OMP_NUM_THREADS`,`MKL_NUM_THREADS`
2. Sets `nt` threads for numpy and pytorch.

In [None]:
#export
def _call(lock, pause, n, g, item):
    l = False
    if pause:
        try:
            l = lock.acquire(timeout=pause*(n+2))
            time.sleep(pause)
        finally:
            if l: lock.release()
    return g(item)

In [None]:
#export
class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
    "Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution"
    def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):
        if max_workers is None: max_workers=defaults.cpus
        store_attr()
        self.not_parallel = max_workers==0
        if self.not_parallel: max_workers=1
        super().__init__(max_workers, **kwargs)

    def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
        self.lock = Manager().Lock()
        g = partial(f, *args, **kwargs)
        if self.not_parallel: return map(g, items)
        _g = partial(_call, self.lock, self.pause, self.max_workers, g)
        try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)
        except Exception as e: self.on_exc(e)

In [None]:
show_doc(ThreadPoolExecutor, title_level=4)

<h4 id="ThreadPoolExecutor" class="doc_header"><code>class</code> <code>ThreadPoolExecutor</code><a href="" class="source_link" style="float:right">[source]</a></h4>

> <code>ThreadPoolExecutor</code>(**`max_workers`**=*`64`*, **`on_exc`**=*`print`*, **`pause`**=*`0`*, **\*\*`kwargs`**) :: [`ThreadPoolExecutor`](/parallel.html#ThreadPoolExecutor)

Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution

In [None]:
#export
class ProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor):
    "Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution"
    def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):
        if max_workers is None: max_workers=defaults.cpus
        store_attr()
        self.not_parallel = max_workers==0
        if self.not_parallel: max_workers=1
        super().__init__(max_workers, **kwargs)

    def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):
        self.lock = Manager().Lock()
        g = partial(f, *args, **kwargs)
        if self.not_parallel: return map(g, items)
        _g = partial(_call, self.lock, self.pause, self.max_workers, g)
        try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)
        except Exception as e: self.on_exc(e)

In [None]:
show_doc(ProcessPoolExecutor, title_level=4)

<h4 id="ProcessPoolExecutor" class="doc_header"><code>class</code> <code>ProcessPoolExecutor</code><a href="" class="source_link" style="float:right">[source]</a></h4>

> <code>ProcessPoolExecutor</code>(**`max_workers`**=*`64`*, **`on_exc`**=*`print`*, **`pause`**=*`0`*, **\*\*`kwargs`**) :: [`ProcessPoolExecutor`](/parallel.html#ProcessPoolExecutor)

Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution

In [None]:
#export
try: from fastprogress import progress_bar
except: progress_bar = None

In [None]:
#export 
def parallel(f, items, *args, n_workers=defaults.cpus, total=None, progress=None, pause=0,
             threadpool=False, timeout=None, chunksize=1, **kwargs):
    "Applies `func` in parallel to `items`, using `n_workers`"
    pool = ThreadPoolExecutor if threadpool else ProcessPoolExecutor
    with pool(n_workers, pause=pause) as ex:
        r = ex.map(f,items, *args, timeout=timeout, chunksize=chunksize, **kwargs)
        if progress and progress_bar:
            if total is None: total = len(items)
            r = progress_bar(r, total=total, leave=False)
        return L(r)

In [None]:
def add_one(x, a=1): 
    time.sleep(random.random()/80)
    return x+a

inp,exp = range(50),range(1,51)
test_eq(parallel(add_one, inp, n_workers=2, progress=False), exp)
test_eq(parallel(add_one, inp, threadpool=True, n_workers=2, progress=False), exp)
test_eq(parallel(add_one, inp, n_workers=0), exp)
test_eq(parallel(add_one, inp, n_workers=1, a=2), range(2,52))
test_eq(parallel(add_one, inp, n_workers=0, a=2), range(2,52))

Use the `pause` parameter to ensure a pause of `pause` seconds between processes starting. This is in case there are race conditions in starting some process, or to stagger the time each process starts, for example when making many requests to a webserver. Set `threadpool=True` to use `ThreadPoolExecutor` instead of `ProcessPoolExecutor`.

In [None]:
from datetime import datetime

In [None]:
def print_time(i): 
    time.sleep(random.random()/1000)
    print(i, datetime.now())

parallel(print_time, range(5), n_workers=2, pause=0.25);

1 2020-12-11 19:32:27.930895
0 2020-12-11 19:32:28.181040
2 2020-12-11 19:32:28.431867
3 2020-12-11 19:32:28.682935
4 2020-12-11 19:32:28.933846


Note that `f` should accept a collection of items.

In [None]:
#export
def run_procs(f, f_done, args):
    "Call `f` for each item in `args` in parallel, yielding `f_done`"
    processes = L(args).map(Process, args=arg0, target=f)
    for o in processes: o.start()
    yield from f_done()
    processes.map(Self.join())

In [None]:
#export
def _f_pg(obj, queue, batch, start_idx):
    for i,b in enumerate(obj(batch)): queue.put((start_idx+i,b))

def _done_pg(queue, items): return (queue.get() for _ in items)

In [None]:
#export 
def parallel_gen(cls, items, n_workers=defaults.cpus, **kwargs):
    "Instantiate `cls` in `n_workers` procs & call each on a subset of `items` in parallel."
    if n_workers==0:
        yield from enumerate(list(cls(**kwargs)(items)))
        return
    batches = L(chunked(items, n_chunks=n_workers))
    idx = L(itertools.accumulate(0 + batches.map(len)))
    queue = Queue()
    if progress_bar: items = progress_bar(items, leave=False)
    f=partial(_f_pg, cls(**kwargs), queue)
    done=partial(_done_pg, queue, items)
    yield from run_procs(f, done, L(batches,idx).zip())

In [None]:
class _C:
    def __call__(self, o): return ((i+1) for i in o)

items = range(5)

res = L(parallel_gen(_C, items, n_workers=3))
idxs,dat1 = zip(*res.sorted(itemgetter(0)))
test_eq(dat1, range(1,6))

res = L(parallel_gen(_C, items, n_workers=0))
idxs,dat2 = zip(*res.sorted(itemgetter(0)))
test_eq(dat2, dat1)

`cls` is any class with `__call__`. It will be passed `args` and `kwargs` when initialized. Note that `n_workers` instances of `cls` are created, one in each process. `items` are then split in `n_workers` batches and one is sent to each `cls`. The function then returns a generator of tuples of item indices and results.

In [None]:
class TestSleepyBatchFunc:
    "For testing parallel processes that run at different speeds"
    def __init__(self): self.a=1
    def __call__(self, batch):
        for k in batch:
            time.sleep(random.random()/4)
            yield k+self.a

x = np.linspace(0,0.99,20)
res = L(parallel_gen(TestSleepyBatchFunc, x, n_workers=2))
test_eq(res.sorted().itemgot(1), x+1)

# Export -

In [None]:
#hide
from nbdev.export import notebook2script
notebook2script()

Converted 00_test.ipynb.
Converted 01_basics.ipynb.
Converted 02_foundation.ipynb.
Converted 03_xtras.ipynb.
Converted 03a_parallel.ipynb.
Converted 03b_net.ipynb.
Converted 04_dispatch.ipynb.
Converted 05_transform.ipynb.
Converted 07_meta.ipynb.
Converted 08_script.ipynb.
Converted index.ipynb.
