{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": { "nbsphinx": "hidden" }, "outputs": [], "source": [ "import sys\n", "sys.path.append('../')\n", "\n", "# import os\n", "# os.environ['PYTHONASYNCIODEBUG'] = '1'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note: you can try this tutorial in [![Binder](https://mybinder.org/badge.svg)](https://mybinder.org/v2/gh/zh217/aiochan/master?filepath=doc%2Fcoroutine.ipynb)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Coroutines and event loops" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Programming languages begin in different ways, with different goals. Unfortunately, in the case of python, concurrency isn't one of the goals (unlike, say, in the case of Erlang and Elixir). As a result, concurrency in python feel a bit foreign, cumbersome and unnatural." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In particular, when you start your python interactive interpreter, or when you run your python script, your program begins in an environment that can be roughly described as a single-threaded process, with no obvious ladder that allows you to climb up onto a concurrent environment. So what are \"processes\" and \"threads\"?" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\"Process\" here means something different from the \"process\" in CSP: here it refers to an instance of your program that is executed for you by the operating system. A process has its own memory space and file descriptors provided by the operating system, and these are by default isolated from the other processes on the same system." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A process may have one or more *threads of execution* running at the same time, with each thread executing its part of the code in sequence, but different threads can share memory. If the sharing is not done carefully and in a principled way, as we have seen it will lead to non-deterministic results." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Python supports threads by providing the `thread` and `threading` module. But these are not too useful (compared to other languages, at least): in python, the danger of threads stepping on each other is so great that the default implementation, CPython, has a global interpreter lock, or GIL, that ensures that only a single python statement can execute at a given instance! Even with such a big fat lock always in place, we still need locks in order to prevent unwanted interleaving! So the GIL prevents us from using multiple processors. Locking has overheads. To make things still worse, python schedules thread execution in a somewhat unintuitive manner which results in favouring slow (or CPU-intensive) operations over fast ones, the opposite of what most operating system does. The end result: python code utilizing threads often runs *slower* than those that do not." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Python also supports spawning processes within python itself using the `multiprocessing` module. But by default processes don't share memory or resources, hence inter-process communicating is restricted and cumbersome. The overhead of using processes is even greater than using threads. Well, the picture isn't very pretty." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Still, not being able to deal with multiple things at once is stupid. Considering the situation we are in, it seems the best way to go forward is to have something that is single-threaded (lower overhead, hopefully) that can imitate multiple threads of execution. And there is something along this way built into the language since python 3.4, it is called `asyncio`." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Compared to plain python, `asyncio` utilizes two further keywords (since python 3.5, at least): `async` and `await`. `async` is applied to functions (and methods). An example:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "async def silly(x):\n", " print('executing silly with', x)\n", " return x+1" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It seems normal enough. But when you call it:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "silly(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It seems that the function doesn't execute but instead returns something called a coroutine. Calling async function is a two step process: first you call it normally and obtain a coroutine, and the coroutine needs to be given to some scheduler, or *event loop*, for execution. The function `aiochan.run` will do the scheduling and executing part for you:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "executing silly with 2\n" ] }, { "data": { "text/plain": [ "3" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import aiochan as ac\n", "ac.run(silly(2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Every call to `ac.run` creates a new event loop, which runs until the passed in async function finishes executing." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "All this ceremony of using `async` and running coroutines in a strange way sets up the stage for using the `await` keyword:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "counter: 1\n", "counter: 2\n", "counter: 3\n" ] } ], "source": [ "import asyncio\n", "\n", "async def count(tag, n_max=3):\n", " i = 0\n", " while i < n_max:\n", " await asyncio.sleep(0.5)\n", " i += 1\n", " print(tag, i)\n", "\n", "ac.run(count('counter:'))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Whatever after the `await` keyword must be an *awaitable*, which roughly says that \"this computation here will eventually produce something, but maybe not right now, and while waiting you may go off and do something else: the scheduler will let you continue when the result is ready\"." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's see what happens when we run two counters (remember that the function `count`, when called, produces a coroutine, which is an awaitable):" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "counter a: 1\n", "counter a: 2\n", "counter a: 3\n", "counter b: 1\n", "counter b: 2\n", "counter b: 3\n" ] } ], "source": [ "async def main():\n", " await count('counter a:')\n", " await count('counter b:')\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Hmm ... this doesn't look very concurrent: the second counter starts counting only after the first counter finishes. But this is what we asked for: we awaited for the completion of the first counter!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To make the two counters execute together, we use the `aiochan.go` function, which takes a coroutine and schedules it for execution but do not wait for the result:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "counter b: 1\n", "counter a: 1\n", "counter b: 2\n", "counter a: 2\n", "counter b: 3\n", "counter a: 3\n" ] } ], "source": [ "async def main():\n", " ac.go(count('counter a:'))\n", " await count('counter b:')\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Much better now. Note that you *must* pass the coroutine to `aiochan.go` for execution: calling the function itself has no effect (other than a possible warning):" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/home/zh217/.pyenv/versions/3.6.6/lib/python3.6/site-packages/ipykernel_launcher.py:2: RuntimeWarning: coroutine 'count' was never awaited\n", " \n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "counter b: 1\n", "counter b: 2\n", "counter b: 3\n" ] } ], "source": [ "async def main():\n", " count('counter a:')\n", " await count('counter b:')\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "What happens we replace both counter calls with `aiochan.go`?" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "async def main():\n", " ac.go(count('counter a:'))\n", " ac.go(count('counter b:'))\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Nothing happens! Remember that `ac.run` returns when the coroutine passed in returns, and our `main` returns after having two counters scheduled for execution, without actually executing them!" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To make this clearer, note that if we sleep in the `main` function at the end, the two counters will be executed:" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "counter a: 1\n", "counter b: 1\n", "counter a: 2\n", "counter b: 2\n", "counter a: 3\n", "counter b: 3\n" ] } ], "source": [ "async def main():\n", " ac.go(count('counter a:'))\n", " ac.go(count('counter b:'))\n", " await asyncio.sleep(3)\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you have done thread-based programming before, you may think now that asyncio is no different from threading. This is not true. To illustrate, consider:" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "before\n", "after\n", "do work\n" ] } ], "source": [ "async def main():\n", " async def work():\n", " print('do work')\n", " print('before')\n", " ac.go(work())\n", " print('after')\n", "\n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "What you get is *always* `before`, `after`, and `do work`, in that order. In some languages, using thread, it is possible to get garbled texts, since the various calls to `print` (or whatever it is called) can step on each other. By contrast, asyncio event loops uses only a single thread, and it is guaranteed that unless you `await`, nothing else will get in your way when you are executing. (If you read the documentations for asyncio, you will find that even things like locks and semaphores are marked \"not thread-safe\" --- they are only safe with respect to the non-interrupting guaruantees provided by asyncio.)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So asyncio guarantees \"no break unless await\", in other words, it implements *co-operative multitasking*, in constrast to the *pre-emptive multitasking* provided by threads (and your operating system) where your work may be interrupted at any time. Programming co-operative multitasking is in general easier, however it is sometimes necessary to explicitly give up control to the scheduler in order for other tasks to have a chance to run. In aiochan, you can await for `aiochan.nop()` to achieve this:" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "before\n", "do work\n", "after\n" ] } ], "source": [ "async def main():\n", " async def work():\n", " print('do work')\n", " print('before')\n", " ac.go(work())\n", " await ac.nop()\n", " print('after')\n", "\n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note the order. Also note that in this case, theoretically the order *isn't* guaranteed --- that you always get this order back should be considered an implementation detail." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now you know how to make coroutines and run them. That roughly corresponds to the \"sequential processes\" that we talked about before (and remember not to touch global states). In the next section, we will learn about the \"communicating\" part." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To recap:\n", "\n", "* Python was not designed for concurrency.\n", "* There are a number of ways to do concurrency in python: processes, threads, and asyncio event-loops.\n", "* Asyncio event loops are single-threaded schedulers responsible for executing coroutines.\n", "* Coroutine functions are made with `async` and `await` keywords. No interleaving of execution can occur unless an `await` keyword is encountered.\n", "\n", "Useful functions:\n", "\n", "* `aiochan.run`\n", "* `aiochan.go`\n", "* `aiochan.nop`\n", "* `asyncio.sleep`\n", "\n", "There is also `aiochan.run_in_thread`, which is recommended for scripts. `aiochan.run` is recommended when programming interactively." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Note about `ac.run` in Jupyter notebooks" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you use `ac.run` in Jupyter notebooks to run examples, sometimes you will see warnings saying something like:\n", "\n", "```\n", "Task was destroyed but it is pending!\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "*In the notebook*, this is mostly harmless and it is due to our not always closing all our coroutines when our main coroutine exits. We do this in order to make our examples simple, and to avoid using constructs not already introduced. However, *in production*, any warning is a cause for concern." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Appendix: running async functions without aiochan" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In our exposition we used the function `aiochan.run` to run all our async functions. How to do it with native python libraries? We use asyncio event loops to do the execution:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```\n", "import asyncio\n", "\n", "loop = asyncio.get_event_loop()\n", "result = loop.run_until_complete(silly(2))\n", "print('the result is', result)\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can try running the above code. What you get depends on how you run it: if you run it in a script or in an interactive interpreter, then you will see printed:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```\n", "executing silly with 2\n", "result: 3\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "However, if you run it in jupyter notebooks or jupyterlab, there is a possibility that you will get an exception thrown at your face (or maybe not, it all depends):" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```\n", "---------------------------------------------------------------------------\n", "RuntimeError Traceback (most recent call last)\n", " in ()\n", " 2 \n", " 3 loop = asyncio.get_event_loop()\n", "----> 4 result = loop.run_until_complete(silly(2))\n", " 5 print('the result is', result)\n", "\n", "~/.pyenv/versions/3.6.6/lib/python3.6/asyncio/base_events.py in run_until_complete(self, future)\n", " 453 future.add_done_callback(_run_until_complete_cb)\n", " 454 try:\n", "--> 455 self.run_forever()\n", " 456 except:\n", " 457 if new_task and future.done() and not future.cancelled():\n", "\n", "~/.pyenv/versions/3.6.6/lib/python3.6/asyncio/base_events.py in run_forever(self)\n", " 407 self._check_closed()\n", " 408 if self.is_running():\n", "--> 409 raise RuntimeError('This event loop is already running')\n", " 410 if events._get_running_loop() is not None:\n", " 411 raise RuntimeError(\n", "\n", "RuntimeError: This event loop is already running\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So we already have a loop running? Ok, it is still possible to proceed in this case" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "the result is :1>>\n", "executing silly with 2\n" ] } ], "source": [ "import asyncio\n", "\n", "loop = asyncio.get_event_loop()\n", "result = loop.create_task(silly(2))\n", "print('the result is', result)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So apparently our async function is executed now, but now we only get a task back, not the result itself! To get the result:" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "3" ] }, "execution_count": 14, "metadata": {}, "output_type": "execute_result" } ], "source": [ "result.result()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "... which seems to be fine, but that is only because you are executing it interactively. If you put this line directly below `print`, you most certainly will get:" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```\n", "---------------------------------------------------------------------------\n", "InvalidStateError Traceback (most recent call last)\n", " in ()\n", " 4 result = loop.create_task(silly(2))\n", " 5 print('the result is', result)\n", "----> 6 result.result()\n", "\n", "InvalidStateError: Result is not set.\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "which tells you that you are calling the function too soon! You will need to wait a little bit (but if you do it wrong it will deadlock), or you create some future and set the result use a callback. If you really want to figure it out you can read the python documentations." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So now you believe me when I say that doing concurrency in python feels foreign, cumbersome and unnatural. Running everything in a script is a solution, but one of the appeal of python is its interactivity." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can also replace calls to `aiochan.go` with `asyncio.get_running_loop().create_task`, but ... what a mouthful! `asyncio.ensure_future` is also a possibility, but in addition to its questionable name, its use in spawning tasks for execution is deprecated in python 3.7 in favour of `asyncio.create_task`. However, `asyncio.create_task` doesn't exist prior to python 3.7. So ... if you intend to use `aiochan` at all, we urge you to stay with `aiochan.go`." ] } ], "metadata": { "celltoolbar": "Edit Metadata", "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.6" } }, "nbformat": 4, "nbformat_minor": 2 }