{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#default_exp parallel" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "from fastcore.imports import *\n", "from fastcore.foundation import *\n", "from fastcore.basics import *\n", "from fastcore.xtras import *\n", "from functools import wraps\n", "\n", "# from contextlib import contextmanager,ExitStack\n", "from multiprocessing import Process, Queue\n", "import concurrent.futures,time\n", "from multiprocessing import Manager, set_start_method\n", "from threading import Thread\n", "\n", "try: \n", " if sys.platform == 'darwin': set_start_method(\"fork\")\n", "except: pass" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from fastcore.test import *\n", "from nbdev.showdoc import *\n", "from fastcore.nb_imports import *" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Parallel\n", "\n", "> Threading and multiprocessing functions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def threaded(f):\n", " \"Run `f` in a thread, and returns the thread\"\n", " @wraps(f)\n", " def _f(*args, **kwargs):\n", " res = Thread(target=f, args=args, kwargs=kwargs)\n", " res.start()\n", " return res\n", " return _f" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "first\n", "second\n" ] } ], "source": [ "@threaded\n", "def _1():\n", " time.sleep(0.05)\n", " print(\"second\")\n", "\n", "@threaded\n", "def _2():\n", " time.sleep(0.01)\n", " print(\"first\")\n", "\n", "_1()\n", "_2()\n", "time.sleep(0.1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def startthread(f):\n", " \"Like `threaded`, but start thread immediately\"\n", " threaded(f)()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "first\n", "second\n" ] } ], "source": [ "@startthread\n", "def _():\n", " time.sleep(0.05)\n", " print(\"second\")\n", "\n", "@startthread\n", "def _():\n", " time.sleep(0.01)\n", " print(\"first\")\n", "\n", "time.sleep(0.1)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def set_num_threads(nt):\n", " \"Get numpy (and others) to use `nt` threads\"\n", " try: import mkl; mkl.set_num_threads(nt)\n", " except: pass\n", " try: import torch; torch.set_num_threads(nt)\n", " except: pass\n", " os.environ['IPC_ENABLE']='1'\n", " for o in ['OPENBLAS_NUM_THREADS','NUMEXPR_NUM_THREADS','OMP_NUM_THREADS','MKL_NUM_THREADS']:\n", " os.environ[o] = str(nt)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This sets the number of threads consistently for many tools, by:\n", "\n", "1. Set the following environment variables equal to `nt`: `OPENBLAS_NUM_THREADS`,`NUMEXPR_NUM_THREADS`,`OMP_NUM_THREADS`,`MKL_NUM_THREADS`\n", "2. Sets `nt` threads for numpy and pytorch." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def _call(lock, pause, n, g, item):\n", " l = False\n", " if pause:\n", " try:\n", " l = lock.acquire(timeout=pause*(n+2))\n", " time.sleep(pause)\n", " finally:\n", " if l: lock.release()\n", " return g(item)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "def check_parallel_num(param_name, num_workers):\n", " if sys.platform == \"win32\" and IN_NOTEBOOK and num_workers > 0:\n", " print(\"Due to IPython and Windows limitation, python multiprocessing isn't available now.\")\n", " print(f\"So `{param_name}` is changed to 0 to avoid getting stuck\")\n", " num_workers = 0\n", " return num_workers" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#export\n", "class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):\n", " \"Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution\"\n", " def __init__(self, max_workers=defaults.cpus, on_exc=print, pause=0, **kwargs):\n", " if max_workers is None: max_workers=defaults.cpus\n", " store_attr()\n", " self.not_parallel = max_workers==0\n", " if self.not_parallel: max_workers=1\n", " super().__init__(max_workers, **kwargs)\n", "\n", " def map(self, f, items, *args, timeout=None, chunksize=1, **kwargs):\n", " if self.not_parallel == False: self.lock = Manager().Lock()\n", " g = partial(f, *args, **kwargs)\n", " if self.not_parallel: return map(g, items)\n", " _g = partial(_call, self.lock, self.pause, self.max_workers, g)\n", " try: return super().map(_g, items, timeout=timeout, chunksize=chunksize)\n", " except Exception as e: self.on_exc(e)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "data": { "text/markdown": [ "
class
ThreadPoolExecutor
[source]ThreadPoolExecutor
(**`max_workers`**=*`20`*, **`on_exc`**=*`print`*, **`pause`**=*`0`*, **\\*\\*`kwargs`**) :: [`ThreadPoolExecutor`](/parallel.html#ThreadPoolExecutor)\n",
"\n",
"Same as Python's ThreadPoolExecutor, except can pass `max_workers==0` for serial execution"
],
"text/plain": [
"class
ProcessPoolExecutor
[source]ProcessPoolExecutor
(**`max_workers`**=*`20`*, **`on_exc`**=*`print`*, **`pause`**=*`0`*, **\\*\\*`kwargs`**) :: [`ProcessPoolExecutor`](/parallel.html#ProcessPoolExecutor)\n",
"\n",
"Same as Python's ProcessPoolExecutor, except can pass `max_workers==0` for serial execution"
],
"text/plain": [
"