{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": { "nbsphinx": "hidden" }, "outputs": [], "source": [ "import sys\n", "sys.path.append('../')\n", "\n", "# import os\n", "# os.environ['PYTHONASYNCIODEBUG'] = '1'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note: you can try this tutorial in [![Binder](https://mybinder.org/badge.svg)](https://mybinder.org/v2/gh/zh217/aiochan/master?filepath=doc%2Fparallel.ipynb)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Parallelism and beyond" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We discussed `async_pipe` and `async_pipe_unordered` in the context of trying to put more \"concurrency\" into our program by taking advantage of parallelism. What does \"parallelism\" mean here?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Facing the reality of python concurrency, again" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With `async_pipe` and `async_pipe_unordered`, by giving them more coroutine instances to work with, we achieved higher throughput. But that is only because our coroutines are, in a quite literal sense, sleeping on the job: to simulate real jobs, we called `await` on `asyncio.sleep`. The event loop, faced with this await, just puts the coroutine on hold until it is ready to act again." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now it is entirely possible that this behaviour --- of not letting sleeping coroutines block the whole program --- is all you need. In particular, if you are dealing with network connections or sockets *and* you are using a proper asyncio-based library, then \"doing network work\" isn't too much from sleeping on the loop." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "However, for other operations *not* tailored for asyncio, you will *not* get any speed-up with parallelism based on asyncio. Crucially, *asyncio has no built-in support for file accesses*." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's see an example:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]\n", "2.009612141060643\n" ] } ], "source": [ "import asyncio\n", "import time\n", "import aiochan as ac\n", "\n", "async def worker(n):\n", " time.sleep(0.1) # await asyncio.sleep(0.1)\n", " return n*2\n", "\n", "async def main():\n", " start = asyncio.get_event_loop().time()\n", " print(await ac.from_range(20).async_pipe(10, worker).collect())\n", " print(asyncio.get_event_loop().time() - start)\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The only different than before (when we first introduced `async_pipe`) is that we replaced `asyncio.sleep` with `time.sleep`. With this change, we did not get *any* speed up." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this case, we can recover our speed-up by using the method `parallel_pipe` instead:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]\n", "0.20713990507647395\n" ] } ], "source": [ "import asyncio\n", "import time\n", "import aiochan as ac\n", "\n", "def worker(n):\n", " time.sleep(0.1)\n", " return n*2\n", "\n", "async def main():\n", " start = asyncio.get_event_loop().time()\n", " print(await ac.from_range(20).parallel_pipe(10, worker).collect())\n", " print(asyncio.get_event_loop().time() - start)\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When using `parallel_pipe`, our `worker` has to be a normal function instead of an async function. As before, if order is not important, `parallel_pipe_unordered` can give you even more throughput:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]\n", "ordered time: 0.35387236496899277\n", "[16, 2, 8, 24, 6, 10, 0, 32, 22, 34, 12, 36, 4, 38, 28, 18, 30, 20, 14, 26]\n", "unordered time: 0.19887939398176968\n" ] } ], "source": [ "import asyncio\n", "import time\n", "import random\n", "import aiochan as ac\n", "\n", "def worker(n):\n", " time.sleep(random.uniform(0, 0.2))\n", " return n*2\n", "\n", "async def main():\n", " start = asyncio.get_event_loop().time()\n", " print(await ac.from_range(20).parallel_pipe(10, worker).collect())\n", " print('ordered time:', asyncio.get_event_loop().time() - start)\n", "\n", " start = asyncio.get_event_loop().time()\n", " print(await ac.from_range(20).parallel_pipe_unordered(10, worker).collect())\n", " print('unordered time:', asyncio.get_event_loop().time() - start)\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In fact, `parallel_pipe` works by starting a thread-pool and execute the workers in the thread-pool. Multiple threads can solve the problem of workers sleeping on the thread, as in our example. But remember that the default implementation of python, the CPython, has a global interpreter lock (GIL) which prevents more than one python statement executing at the same time. Will `parallel_pipe` help in the presence of GIL, besides the case of workers just sleeping?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It turns out that for the majority of serious cases, multiple threads help even in the presence of the GIL, because most of the heavy-lifting operations, for example file accesses, are implemented in C instead of in pure python, and in C it is possible to release the GIL when not interacting with the python runtime. In addition to file accesses, if you are doing number-crunching, then hopefully you are not doing it in pure python but instead relies on dedicated libraries like numpy, scipy, etc. All of these libraries release the GIL when it makes sense to do so. So using `parallel_pipe` is usually enough." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "What if you just have to do your CPU-intensive tasks in python? Well, `parallel_pipe` and `parallel_pipe_unordered` takes an argument called `mode`, which by default takes the value `'thread'`. If you change it to `'process'`, then a process-pool instead of a thread-pool will be used. Let's see a comparison:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "using threads 1.7299788249656558\n", "using threads 0.20847543003037572\n" ] } ], "source": [ "import asyncio\n", "import time\n", "import aiochan as ac\n", "\n", "def worker(_):\n", " total = 0\n", " for i in range(1000000):\n", " total += i\n", " return total\n", "\n", "async def main():\n", " start = asyncio.get_event_loop().time()\n", " await ac.from_range(20).parallel_pipe(10, worker).collect()\n", " print('using threads', asyncio.get_event_loop().time() - start)\n", "\n", " start = asyncio.get_event_loop().time()\n", " await ac.from_range(20).parallel_pipe(10, worker, mode='process').collect()\n", " print('using threads', asyncio.get_event_loop().time() - start)\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Why not use a process pool in all cases? Processes have much greater overhead than threads, and also far more restrictions on their use. Crucially, you cannot share any object unless you do some dirty work yourself, and anything you pass to your worker, or return from your worker, must be picklable." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In our example, our worker is a pure function. It is also possible to prepare some structures in each worker before-hand. In python 3.7 or above, there are the `initializer` and `init_args` arguments accepted by `parallel_pipe` and `parallel_pipe_unordered`, which will be passed to the construction to the pool executors to do the setup. Prior to python 3.7, such a setup is still possible with some hack: you can put the object to be set up in a `threading.local` object, and for *every* worker execution, check if the object exists, and if not, do the initialization:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "setting up processor\n", "setting up processor\n", "[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]\n" ] } ], "source": [ "import asyncio\n", "import time\n", "import random\n", "import threading\n", "import aiochan as ac\n", "\n", "worker_data = threading.local()\n", "\n", "def worker(n):\n", " try:\n", " processor = worker_data.processor\n", " except:\n", " print('setting up processor')\n", " worker_data.processor = lambda x: x*2\n", " processor = worker_data.processor\n", " return processor(n)\n", "\n", "async def main():\n", " start = asyncio.get_event_loop().time()\n", " print(await ac.from_range(20).parallel_pipe(2, worker).collect())\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Since we used two thread workers, the setup is done twice. This also works for `mode='process'`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "What about parallelising work across the network? Or more exotic workflows? At its core, *aiochan* is a library that facilitates moving data around within the boundary of a single process on a single machine, but there is nothing preventing you using channels at the end-points of a network-based parallelism framework such as message queues or a framework like *dart*. Within its bounday, *aiochan* aims to give you maximum flexibility in developing concurrent workflows, and you should use *aiochan* it in tandem with some other suitable libraries or frameworks when you want to step out of its boundary." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Back to the main thread" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Speaking of stepping out of boundaries, one case is exceedingly common: you use an aiochan-based workflow to prepare a stream of values, but you want to consume these values outside of the asyncio event loop. In this case, there are convenience methods for you:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "work\n", "work\n" ] } ], "source": [ "loop = asyncio.new_event_loop()\n", "\n", "out = ac.Chan(loop=loop)\n", "\n", "async def worker():\n", " while True:\n", " await asyncio.sleep(0.1)\n", " if not (await out.put('work')):\n", " break\n", " \n", "ac.run_in_thread(worker(), loop=loop)\n", "\n", "it = out.to_iterable(buffer_size=1)\n", "\n", "print(next(it))\n", "print(next(it))\n", "\n", "loop.call_soon_threadsafe(out.close);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice how we constructed the channel on the main thread, with explicit arguments specifying on which loop the channel is to be used, and then derived a iterator from the queue. Also, to run the worker, we used `run_in_thread` with an explicit event loop given." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When creating the iterable, notice we have given it a `buffer_size`. This is used to construct a queue for inter-thread communication. You can also use a queue directly:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "work\n", "work\n" ] } ], "source": [ "import queue\n", "\n", "loop = asyncio.new_event_loop()\n", "\n", "out = ac.Chan(loop=loop)\n", "\n", "async def worker():\n", " while True:\n", " await asyncio.sleep(0.1)\n", " if not (await out.put('work')):\n", " break\n", " \n", "ac.run_in_thread(worker(), loop=loop)\n", "\n", "q = queue.Queue()\n", "\n", "out.to_queue(q)\n", "\n", "print(q.get())\n", "print(q.get())\n", "\n", "loop.call_soon_threadsafe(out.close);" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Other queues can be used as long as they follow the public API of `queue.Queue` and are thread-safe." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## aiochan without asyncio" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, before ending this tutorial, let's reveal a secret: you don't need asyncio to use aiochan! \"Isn't aiochan based on asyncio?\" Well, not really, the core algorithms of aiochan (which is based on those from Clojure's core.async) does not use any asyncio constructs: they run entirely synchronously. It is only when you use the use-facing methods such as `get`, `put` and `select` that an asyncio-facade was made to cover the internals." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "On the other hand, there are some functions (actually, three of them) that does not touch anything related to asyncio given the correct arguments:\n", "\n", "* `Chan.put_nowait`\n", "* `Chan.get_nowait`\n", "* `select`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Normally, when you call `ch.put_nowait(v)`, the put will succeed if it is possible to do so immediately (for example, if there is a pending get or buffer can be used), otherwise it will give up. Note that you never `await` on `put_nowait`. However, if you give the argument `immediate_only=True`, then if the operation cannot be completed immediately, it will be queued (but again, the pending queue can overflow). In addition, you can give a callback to the `cb` argument, which will be called when the put finally succeeds, with the same argument as the return value of `await put(v)`. The same is true with `get_nowait(immediate_only=True, cb=cb)`. For `select`, if you give a callback to the `cb` argument, then you should not call `await` on it, but instead rely on the callback being called eventually as `cb(return_value, which_channel)`. Note if you don't expect to use any event loops, when constructing channels, you should explicitly pass in `loop='no_loop'`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Example: this is our asyncio-based fan-in, fan-out:" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "p0 produces p0-0\n", "p0 produces p0-1\n", "p0 produces p0-2\n", "p0 produces p0-3\n", "p1 produces p1-0\n", "p2 produces p2-0\n", "c0 received p0-0\n", "c0 received p0-3\n", "c0 received p1-0\n", "c0 received p2-0\n", "c1 received p0-1\n", "c2 received p0-2\n", "p0 produces p0-4\n", "p1 produces p1-1\n", "p1 produces p1-2\n", "p1 produces p1-3\n", "p2 produces p2-1\n", "c0 received p0-4\n", "c0 received p1-3\n", "c0 received p2-1\n", "c1 received p1-1\n", "c2 received p1-2\n", "p1 produces p1-4\n", "p2 produces p2-2\n", "p2 produces p2-3\n", "p2 produces p2-4\n", "c0 received p1-4\n", "c0 received p2-4\n", "c1 received p2-2\n", "c2 received p2-3\n" ] } ], "source": [ "import aiochan as ac\n", "import asyncio\n", "\n", "async def consumer(c, tag):\n", " async for v in c:\n", " print('%s received %s' % (tag, v))\n", " \n", "async def producer(c, tag):\n", " for i in range(5):\n", " v = '%s-%s' % (tag, i)\n", " print('%s produces %s' % (tag, v))\n", " await c.put(v)\n", " \n", "async def main():\n", " c = ac.Chan()\n", " for i in range(3):\n", " ac.go(consumer(c, 'c' + str(i)))\n", " for i in range(3):\n", " ac.go(producer(c, 'p' + str(i)))\n", " await asyncio.sleep(0.1)\n", "\n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "By the appropriate use of callbacks, we can write avoid using `asyncio` completely:" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "c0 received p0-0\n", "p0 produces p0-0\n", "c1 received p0-1\n", "p0 produces p0-1\n", "c2 received p0-2\n", "p0 produces p0-2\n", "c0 received p0-3\n", "p0 produces p0-3\n", "c1 received p0-4\n", "c2 received p1-0\n", "p1 produces p1-0\n", "c0 received p1-1\n", "p1 produces p1-1\n", "c1 received p1-2\n", "p1 produces p1-2\n", "c2 received p1-3\n", "p1 produces p1-3\n", "c0 received p1-4\n", "c1 received p2-0\n", "p2 produces p2-0\n", "c2 received p2-1\n", "p2 produces p2-1\n", "c0 received p2-2\n", "p2 produces p2-2\n", "c1 received p2-3\n", "p2 produces p2-3\n", "c2 received p2-4\n" ] } ], "source": [ "def consumer(c, tag):\n", " def cb(v):\n", " if v is not None:\n", " print('%s received %s' % (tag, v))\n", " consumer(c, tag)\n", " c.get_nowait(immediate_only=False, cb=cb)\n", "\n", "def producer(c, tag, i=0):\n", " v = '%s-%s' % (tag, i)\n", " def cb(ok):\n", " if ok and i < 4:\n", " print('%s produces %s' % (tag, v))\n", " producer(c, tag, i+1)\n", " \n", " c.put_nowait(v, immediate_only=False, cb=cb)\n", " \n", "def main():\n", " c = ac.Chan(loop='no_loop')\n", " for i in range(3):\n", " consumer(c, 'c' + str(i))\n", " for i in range(3):\n", " producer(c, 'p' + str(i))\n", " \n", "main()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The end result is (almost) the same. An example with `select`:" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "select put into Chan, get value 1\n", "select put into Chan, get value 1\n", "select put into Chan, get value 2\n", "select put into Chan, get value 1\n", "select put into Chan, get value 1\n", "select put into Chan, get value 1\n", "select put into Chan, get value 2\n", "select put into Chan, get value 1\n", "select put into Chan, get value 2\n", "select put into Chan, get value 1\n" ] } ], "source": [ "def select_run():\n", " c = ac.Chan(1, loop='no_loop', name='c')\n", " d = ac.Chan(1, loop='no_loop', name='d')\n", " put_chan = None\n", "\n", " def put_cb(v, c):\n", " nonlocal put_chan\n", " put_chan = c\n", "\n", " ac.select((c, 1), (d, 2), cb=put_cb)\n", "\n", " get_val = None\n", "\n", " def get_cb(v, c):\n", " nonlocal get_val\n", " get_val = v\n", "\n", " ac.select(c, d, cb=get_cb)\n", "\n", " print('select put into %s, get value %s' % (put_chan, get_val))\n", " \n", "def main():\n", " for _ in range(10):\n", " select_run()\n", " \n", "main()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\"But why?\" Well, obviously writing callbacks is much harder than using asyncio. But who knows? Maybe you are writing some other, higher-level framework that can make use of the semantics of aiochan. The possibilities are endless! In particular, there are non-asyncio concurrency frameworks in python itself that utilizes the same coroutines, an example being `python-trio`. Since the core of aiochan does not rely on asyncio, porting it to `trio` is trivial." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.6" } }, "nbformat": 4, "nbformat_minor": 2 }