{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": { "nbsphinx": "hidden" }, "outputs": [], "source": [ "import sys\n", "sys.path.append('../')\n", "\n", "# import os\n", "# os.environ['PYTHONASYNCIODEBUG'] = '1'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note: you can try this tutorial in [![Binder](https://mybinder.org/badge.svg)](https://mybinder.org/v2/gh/zh217/aiochan/master?filepath=doc%2Fchannel.ipynb)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Channels" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we know how to make corountines and schedule them for execution. As we said before, for the coroutines to do IO safely and in a principled way, we will use meeting points, which in `aiochan` is called `Chan`, for \"channel\". Constructing a channel is easy:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Chan<_unk_0 140664273474752>" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import aiochan as ac\n", "import asyncio\n", "\n", "c = ac.Chan()\n", "c" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now suppose we have a producer that can be tasked to producing items, and a consumer that can consume items. The IO in this case is the outputs of the producer outputs, and the inputs of the consumer, and these two are linked in a channel. In code:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "obtained: product 1\n", "obtained: product 2\n", "obtained: product 3\n", "obtained: product 4\n", "obtained: product 5\n", "It is late, let us call it a day.\n" ] } ], "source": [ "async def producer(c):\n", " i = 0\n", " while True:\n", " await asyncio.sleep(0.1) # producing stuff takes time\n", " i += 1\n", " await c.put('product ' + str(i))\n", " \n", "async def consumer(c):\n", " while True:\n", " product = await c.get()\n", " print('obtained:', product)\n", " \n", "async def main():\n", " c = ac.Chan()\n", " ac.go(producer(c))\n", " ac.go(consumer(c))\n", " await asyncio.sleep(0.6)\n", " print('It is late, let us call it a day.')\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We see that `Chan` has two methods `put` and `get`. `put` is used to put stuff into the channel, and `get` is for getting stuff out. Both of these return awaitables, signaling that doing IO with channels involves potential waiting, as two parties need to come together for either of them to proceed. Awaiting a `get` produces the value that is just `put` into the channel." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In `aiochan`, you cannot `put` something turns out to be `None` into a channel (other falsy values such as `0`, `0.0`, `[]`, `{}`, `False` are ok). The reason is that a channel can be closed, and we need to signal somehow to the users of channel that it is closed, and we use `None` for the signal. Another possibility is throwing exceptions, but throwing exceptions in async code quickly gets *very* confusing. So, following Clojure's core.async, we don't allow `None` values in channels." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Speaking of closing channels, note that in our previous example, `main` just walks away when it is determined that everyone should go home. But `producer` and `consumer` are just left there dangling, which is very rude of `main`. Closing the channel is a polite way of notifying them both:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "obtained: product 1\n", "obtained: product 2\n", "obtained: product 3\n", "obtained: product 4\n", "obtained: product 5\n", "It is late, let us call it a day.\n", "consumer goes home\n", "producer goes home\n" ] } ], "source": [ "async def producer(c):\n", " i = 0\n", " while True:\n", " await asyncio.sleep(0.1) # producing stuff takes time\n", " i += 1\n", " still_open = await c.put('product ' + str(i))\n", " if not still_open:\n", " print('producer goes home')\n", " break\n", " \n", " \n", "async def consumer(c):\n", " while True:\n", " product = await c.get()\n", " if product is not None:\n", " print('obtained:', product)\n", " else:\n", " print('consumer goes home')\n", " break\n", " \n", "async def main():\n", " c = ac.Chan()\n", " ac.go(producer(c))\n", " ac.go(consumer(c))\n", " await asyncio.sleep(0.6)\n", " print('It is late, let us call it a day.')\n", " c.close()\n", " await asyncio.sleep(0.2) # necessary to wait for producer\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We see that after the channel is closed with `c.close()`, awaiting a `get` will produce a `None`, whereas awaiting a `put` will produce `False` (before closing it will return `True`)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "By the way, on python 3.6 and above, we can simplify our consumer a bit: here we are just iterating over the values in the channel one by one, which is exactly what an asynchronous iterator does. So we can write" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "obtained: product 1\n", "obtained: product 2\n", "obtained: product 3\n", "obtained: product 4\n", "obtained: product 5\n", "It is late, let us call it a day.\n", "consumer goes home\n", "producer goes home\n" ] } ], "source": [ "async def producer(c):\n", " i = 0\n", " while True:\n", " await asyncio.sleep(0.1) # producing stuff takes time\n", " i += 1\n", " still_open = await c.put('product ' + str(i))\n", " if not still_open:\n", " print('producer goes home')\n", " break\n", " \n", " \n", "async def consumer(c):\n", " async for product in c:\n", " print('obtained:', product)\n", " print('consumer goes home')\n", " \n", "async def main():\n", " c = ac.Chan()\n", " ac.go(producer(c))\n", " ac.go(consumer(c))\n", " await asyncio.sleep(0.6)\n", " print('It is late, let us call it a day.')\n", " c.close()\n", " await asyncio.sleep(0.2) # necessary to wait for producer\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "It is also no longer necessary to test whether `product` is None: the iteration stops automatically when the channel is closed." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that in `aiochan`, a channel is just an object --- in some circles, this is called a \"first-class construct\". This means that it can be passed as arguments to functions (which we just did), returned from functions, or stored in a datastructure for later use (unlike, say, in Erlang). It is even possible to go meta: a channel containing channels." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For example, we can make our producer producing the channel instead:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "obtained: product 1\n", "obtained: product 2\n", "obtained: product 3\n", "obtained: product 4\n", "obtained: product 5\n", "It is late, let us call it a day.\n", "consumer goes home\n", "producer goes home\n" ] } ], "source": [ "async def producer():\n", " c = ac.Chan()\n", " \n", " async def work():\n", " i = 0\n", " while True:\n", " await asyncio.sleep(0.1) # producing stuff takes time\n", " i += 1\n", " still_open = await c.put('product ' + str(i))\n", " if not still_open:\n", " print('producer goes home')\n", " break\n", " \n", " ac.go(work())\n", " return c\n", " \n", " \n", "async def consumer(c):\n", " async for product in c:\n", " print('obtained:', product)\n", " print('consumer goes home')\n", " \n", "async def main():\n", " c = await producer()\n", " ac.go(consumer(c))\n", " await asyncio.sleep(0.6)\n", " print('It is late, let us call it a day.')\n", " c.close()\n", " await asyncio.sleep(0.2) # necessary to wait for producer\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "But in this case, *not* letting the producer producing its own channel actually has benefit: we can easily have several producers working in parallel:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "obtained: product 1 from p0\n", "obtained: product 1 from p1\n", "obtained: product 1 from p2\n", "obtained: product 2 from p0\n", "obtained: product 2 from p1\n", "obtained: product 2 from p2\n", "obtained: product 3 from p0\n", "obtained: product 3 from p1\n", "obtained: product 3 from p2\n", "obtained: product 4 from p0\n", "obtained: product 4 from p1\n", "obtained: product 4 from p2\n", "obtained: product 5 from p0\n", "obtained: product 5 from p1\n", "obtained: product 5 from p2\n", "It is late, let us call it a day.\n", "consumer goes home\n", "producer p0 goes home\n", "producer p1 goes home\n", "producer p2 goes home\n" ] } ], "source": [ "async def producer(c, tag):\n", " i = 0\n", " while True:\n", " await asyncio.sleep(0.1) # producing stuff takes time\n", " i += 1\n", " still_open = await c.put('product %s from %s' % (i, tag))\n", " if not still_open:\n", " print('producer %s goes home' % tag)\n", " break\n", " \n", " \n", "async def consumer(c):\n", " async for product in c:\n", " print('obtained:', product)\n", " print('consumer goes home')\n", " \n", "async def main():\n", " c = ac.Chan()\n", " for i in range(3):\n", " ac.go(producer(c, 'p%s' % i))\n", " ac.go(consumer(c))\n", " await asyncio.sleep(0.6)\n", " print('It is late, let us call it a day.')\n", " c.close()\n", " await asyncio.sleep(0.2) # necessary to wait for producer\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is call *fan-in*: different producers fanning their products into the same channel. We can also have *fan-out*:" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "c0 obtained: product 1 from p0\n", "c1 obtained: product 1 from p1\n", "c2 obtained: product 1 from p2\n", "c0 obtained: product 2 from p0\n", "c1 obtained: product 2 from p1\n", "c2 obtained: product 2 from p2\n", "c0 obtained: product 3 from p0\n", "c1 obtained: product 3 from p1\n", "c2 obtained: product 3 from p2\n", "c0 obtained: product 4 from p0\n", "c1 obtained: product 4 from p1\n", "c2 obtained: product 4 from p2\n", "c0 obtained: product 5 from p0\n", "c1 obtained: product 5 from p1\n", "c2 obtained: product 5 from p2\n", "It is late, let us call it a day.\n", "consumer c0 goes home\n", "consumer c1 goes home\n", "consumer c2 goes home\n", "producer p0 goes home\n", "producer p1 goes home\n", "producer p2 goes home\n" ] } ], "source": [ "async def producer(c, tag):\n", " i = 0\n", " while True:\n", " await asyncio.sleep(0.1) # producing stuff takes time\n", " i += 1\n", " still_open = await c.put('product %s from %s' % (i, tag))\n", " if not still_open:\n", " print('producer %s goes home' % tag)\n", " break\n", " \n", " \n", "async def consumer(c, tag):\n", " async for product in c:\n", " print('%s obtained: %s' % (tag, product))\n", " print('consumer %s goes home' % tag)\n", " \n", "async def main():\n", " c = ac.Chan()\n", " for i in range(3):\n", " ac.go(producer(c, 'p%s' % i))\n", " for i in range(3):\n", " ac.go(consumer(c, 'c%s' % i))\n", " await asyncio.sleep(0.6)\n", " print('It is late, let us call it a day.')\n", " c.close()\n", " await asyncio.sleep(0.2) # necessary to wait for producer\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We see that works are divided between producers and consumers evenly automatically. Even if producers produce things at different rate, this fan-in, fan-out pattern will automatically do the right thing:" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "c0 obtained: product 1 from p0\n", "c1 obtained: product 1 from p1\n", "c2 obtained: product 2 from p0\n", "c0 obtained: product 1 from p2\n", "c1 obtained: product 3 from p0\n", "c2 obtained: product 2 from p1\n", "c0 obtained: product 4 from p0\n", "c1 obtained: product 5 from p0\n", "c2 obtained: product 2 from p2\n", "c0 obtained: product 3 from p1\n", "c1 obtained: product 6 from p0\n", "c2 obtained: product 7 from p0\n", "c0 obtained: product 4 from p1\n", "c1 obtained: product 8 from p0\n", "c2 obtained: product 3 from p2\n", "c0 obtained: product 9 from p0\n", "It is late, let us call it a day.\n", "consumer c1 goes home\n", "consumer c2 goes home\n", "consumer c0 goes home\n", "producer p1 goes home\n", "producer p0 goes home\n" ] } ], "source": [ "async def producer(c, tag, interval):\n", " i = 0\n", " while True:\n", " await asyncio.sleep(interval) # producing stuff takes time\n", " i += 1\n", " still_open = await c.put('product %s from %s' % (i, tag))\n", " if not still_open:\n", " print('producer %s goes home' % tag)\n", " break\n", " \n", " \n", "async def consumer(c, tag):\n", " async for product in c:\n", " print('%s obtained: %s' % (tag, product))\n", " print('consumer %s goes home' % tag)\n", " \n", "async def main():\n", " c = ac.Chan()\n", " for i in range(3):\n", " ac.go(producer(c, ('p%s' % i), interval=(i+1)*0.1))\n", " for i in range(3):\n", " ac.go(consumer(c, 'c%s' % i))\n", " await asyncio.sleep(1)\n", " print('It is late, let us call it a day.')\n", " c.close()\n", " await asyncio.sleep(0.2) # necessary to wait for producer\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We see that jobs are still divided evenly between consumers, but more jobs come from faster producers." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To recap:\n", "\n", "* The construct for inter-coroutine communication is the channel.\n", "* Getting and putting to channels facilitates IO between coroutines.\n", "* Channels are first-class construct: we can pass them around, return them, or store them.\n", "* Channels can be closed.\n", "* `None` values are not allowed on channels.\n", "* Strategically closing channels can be used for execution control.\n", "* Fan-in and fan-out can be used for distributing works among different coroutines.\n", "\n", "Useful constructs:\n", "\n", "* `aiochan.Chan`\n", "* `aiochan.Chan.put`\n", "* `aiochan.Chan.get`\n", "* `aiochan.Chan.close`" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.6" } }, "nbformat": 4, "nbformat_minor": 2 }