{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": { "nbsphinx": "hidden" }, "outputs": [], "source": [ "import sys\n", "sys.path.append('../')\n", "\n", "# import os\n", "# os.environ['PYTHONASYNCIODEBUG'] = '1'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note: you can try this tutorial in [![Binder](https://mybinder.org/badge.svg)](https://mybinder.org/v2/gh/zh217/aiochan/master?filepath=doc%2Fmethods.ipynb)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Methods and functions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we know the basics of channels and operations on them, we will learn about additional methods and functions that can be convenient in various situations." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Putting and getting" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As we have already seen, we can `add` into a channel. Immediately closing the channel afterwards ensures that no further items can be put into the channel:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "1\n", "2\n", "3\n", "put/get after closing: None\n" ] } ], "source": [ "import aiochan as ac\n", "import asyncio\n", "\n", "async def main():\n", " c = ac.Chan().add(1, 2, 3).close()\n", " async for v in c:\n", " print(v)\n", " await c.put(4)\n", " r = await c.get()\n", " print('put/get after closing:', r)\n", "\n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This method is mainly provided for convenience. You should NOT adding too much stuff into a channel in this way: it is non-blocking, the puts are accumulated, and if there are too many pending puts accumulated in this way overflow will occur. Adding fewer than 10 items during the initialization phase of a channel is considered ok though." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the last example we consumed values using the `async for` syntax. In case where we *must* deal with many values of the channel at once instead of one by one, we can use `collect`:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 2, 3]\n" ] } ], "source": [ "async def main():\n", " c = ac.Chan().add(1, 2, 3).close()\n", " r = await c.collect()\n", " print(r)\n", "\n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this case, closing the channel first before calling `collect` is essential: otherwise the `await` would block forever (and overflow would probably occur if values continuously come in)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`collect` also accepts an argument `n` which specifies the maximum number of elements that will be collected. Using it, we can `collect` on channels that are not yet closed (but we still need to think about how many items we can deal with):" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 2]\n" ] } ], "source": [ "async def main():\n", " c = ac.Chan().add(1, 2, 3) # no closing\n", " r = await c.collect(2)\n", " print(r)\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Above we have said that using `add` to add too many items is dangerous. If you have an existing sequence which you want to turn into a channel, it is much better to use `from_iter`:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 2, 3, 4, 5, 6]\n", "True\n" ] } ], "source": [ "async def main():\n", " c = ac.from_iter([1, 2, 3, 4, 5, 6])\n", " r = await c.collect()\n", " print(r)\n", " print(c.closed)\n", "\n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that the channel is closed on construction (we can check whether a channel is closed by using the `.closed` property on a channel)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Infinite collections are ok:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]\n", "True\n", "[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]\n", "True\n" ] } ], "source": [ "def natural_numbers():\n", " i = 0\n", " while True:\n", " yield i\n", " i += 1\n", "\n", "async def main():\n", " c = ac.from_iter(natural_numbers())\n", " r = await c.collect(10)\n", " print(r)\n", " print(c.closed)\n", " r = await c.collect(10)\n", " print(r)\n", " print(c.closed)\n", "\n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Even when the channel is closed, values can still be obtained from it (and in this case values cannot be exhausted). Closing only stops putting operations immediately." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Making channels producing numbers is so common that we have a function for it:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]\n", "[0, 1, 2, 3, 4]\n", "[0, 3, 6, 9]\n" ] } ], "source": [ "async def main():\n", " c1 = ac.from_range()\n", " r = await c1.collect(10)\n", " print(r) # natural numbers\n", " \n", " c2 = ac.from_range(5) # same as ac.from_iter(range(5))\n", " r = await c2.collect()\n", " print(r)\n", " \n", " c3 = ac.from_range(0, 10, 3) # same as ac.from_iter(range(0, 10, 3))\n", " r = await c3.collect()\n", " print(r)\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To recap:\n", "\n", "* `.add` can be used to add a few items into a channel on initialization (any other use is dangerous)\n", "* `.collect` can be used to bulk get items from channel\n", "* `.closed` tests if a channel is already closed\n", "* `from_iter` creates channels containing all elements from an iterable (even infinite iterable is ok)\n", "* `from_range` is tailored for making channels generating number series" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Time-based operations" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "So far we have always used `asyncio.sleep` to make execution stop for a little while, pretending to do work. We also have `timeout` function that does almost the same thing by producing a channel that automatically closes after an interval:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "1.001395168947056\n" ] } ], "source": [ "async def main():\n", " start = asyncio.get_event_loop().time()\n", " c = ac.timeout(1.0)\n", " await c.get()\n", " end = asyncio.get_event_loop().time()\n", " print(end - start)\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This is useful even when we are not pretending to do work, for example, for timeout control:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "do work\n", "do work\n", "do work\n", "do work\n", "do work\n", "done\n" ] } ], "source": [ "async def main():\n", " tout = ac.timeout(1.0)\n", " while (await ac.select(tout, default=True))[0]:\n", " print('do work')\n", " await asyncio.sleep(0.2)\n", " print('done')\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The above example is written in a somewhat terse style. You should try to understand why it achieves the closing on time behaviour." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As `timeout` produces a channel, which can be passed arount and `select`ed, it offers great flexibility for controlling time-based behaviours. However, using it for the ticks of a clock is harmful, as exemplified below:" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0 0.10043401701841503\n", "1 0.20142484991811216\n", "2 0.30242938199080527\n", "3 0.4030482260277495\n", "4 0.5035843959776685\n", "5 0.6041081629227847\n", "6 0.7046528200153261\n", "7 0.8056348919635639\n", "8 0.9063465989893302\n", "9 1.0068686519516632\n", "10 1.1073921599891037\n", "11 1.2079381300136447\n", "12 1.3089604979613796\n", "13 1.4095268349628896\n", "14 1.5100650689564645\n", "15 1.6105891889892519\n", "16 1.7114433919778094\n", "17 1.81249319401104\n", "18 1.9130375039530918\n", "19 2.0135989299742505\n" ] } ], "source": [ "async def main():\n", " start = asyncio.get_event_loop().time()\n", " for i in range(20):\n", " await ac.timeout(0.1).get()\n", " print(i, asyncio.get_event_loop().time() - start)\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The problem is that `timeout` guarantees that it will close *after* the specified time has elapsed, and will make an attempt to close as soon as possible, but it can never close at the precise instant. Over time, errors will accumulate. In the above example, we have already accumulated 0.01 seconds of error in mere 2 seconds." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If want something that ticks, use the `tick_tock` function:" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0 0.1004815329797566\n", "1 0.2012625669594854\n", "2 0.3008053069934249\n", "3 0.4013087539933622\n", "4 0.5008452819893137\n", "5 0.6013440380338579\n", "6 0.7008649010676891\n", "7 0.8013983579585329\n", "8 0.900891529978253\n", "9 1.001404833048582\n", "10 1.100898704957217\n", "11 1.2013944609789178\n", "12 1.3008839710382745\n", "13 1.4013996929861605\n", "14 1.501174372038804\n", "15 1.6006878040498123\n", "16 1.701174663961865\n", "17 1.8006792459636927\n", "18 1.9011599159566686\n", "19 2.000674612005241\n" ] } ], "source": [ "async def main():\n", " start = asyncio.get_event_loop().time()\n", " ticker = ac.tick_tock(0.1)\n", " for i in range(20):\n", " await ticker.get()\n", " print(i, asyncio.get_event_loop().time() - start)\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Errors are still unavoidable, but they do not accumulate." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To recap:\n", "\n", "* Use `timeout` to control the timing of operations (maybe together with `select`)\n", "* If the timing control is recurrent, consider using `tick_tock`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Functional methods" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you have done any functional programming, you are certainly familiar with things like `map`, `reduce` (or `foldl`, `foldr`), `filter` and friends. Channels are armed with these so-called functional chainable methods which, when called, return new channels containing the expected elements." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Examples:" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "map [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]\n", "filter [0, 2, 4, 6, 8]\n", "take [0, 1, 2, 3, 4]\n", "drop [5, 6, 7, 8, 9]\n", "take_while [0, 1, 2, 3, 4]\n", "drop_while [5, 6, 7, 8, 9]\n" ] } ], "source": [ "async def main():\n", " print('map', await ac.from_range(10).map(lambda x: x*2).collect())\n", " print('filter', await ac.from_range(10).filter(lambda x: x % 2 == 0).collect())\n", " print('take', await ac.from_range(10).take(5).collect())\n", " print('drop', await ac.from_range(10).drop(5).collect())\n", " print('take_while', await ac.from_range(10).take_while(lambda x: x < 5).collect())\n", " print('drop_while', await ac.from_range(10).drop_while(lambda x: x < 5).collect())\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There is also `distinct`:" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[0, 1, 2, 3, 4, 5, 4, 3, 2, 1, 0]\n" ] } ], "source": [ "async def main():\n", " c = ac.from_iter([0,0,0,1,1,2,2,2,2,3,3,4,4,4,5,4,4,3,3,2,1,1,1,0])\n", " print(await c.distinct().collect())\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that only *consecutive* values are tested for distinctness." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You probably know `reduce`, the so-called universal reducing function:" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[45]\n", "[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]\n" ] } ], "source": [ "async def main():\n", " print(await ac.from_range(10).reduce(lambda a, b: a+b).collect())\n", " print(await ac.from_range(10).reduce(lambda acc, nxt: acc + [nxt], init=[]).collect())\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As we can see, you can optionally pass an initial value for `reduce`. Notice that `reduce` only returns a value when the channel is closed: it turns a whole channel of values into a channel containing only a single value. Most of the time you may want intermediate results as well, so you probably want to use `scan` instead:" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[0, 1, 3, 6, 10, 15, 21, 28, 36, 45]\n", "[[], [0], [0, 1], [0, 1, 2], [0, 1, 2, 3], [0, 1, 2, 3, 4], [0, 1, 2, 3, 4, 5], [0, 1, 2, 3, 4, 5, 6], [0, 1, 2, 3, 4, 5, 6, 7], [0, 1, 2, 3, 4, 5, 6, 7, 8], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]]\n" ] } ], "source": [ "async def main():\n", " print(await ac.from_range(10).scan(lambda a, b: a+b).collect())\n", " print(await ac.from_range(10).scan(lambda acc, nxt: acc + [nxt], init=[]).collect())\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "All of these \"functional\" methods accept two optional values: `out` and `close`. As we have said previously, these functions operate by returning a new channel containing the processed values. If another channel is given as the `out` argument, then that channel will receive the processed values instead. Also, when the source channel is closed, by default the out channel will be as well. You can prevent this by setting `close` to `False`. This is illustrated below:" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "False\n", "[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]\n" ] } ], "source": [ "async def main():\n", " out = ac.Chan(5) # we can use buffers as we please\n", " ac.from_range(10).map(lambda x: x*2, out=out, close=False)\n", " print(out.closed)\n", " print(await out.collect(10))\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To recap:\n", "\n", "* `map`, `reduce`, `filter`, `distinct`, `take`, `drop`, `take_while`, `drop_while`, `scan` do what you expect them to do.\n", "* You can control the construction of the output channel and whether to close it when the input is exhausted by specifying the `out` and `close` argument." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Pipeline methods" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There are times that your processing is rather complicated to express with the above functional methods. For example, given the sequence `[1,2,1,3,1]`, you want to produce the sequence `[1,2,2,1,3,3,3,1]`. In this case you can use the `async_apply` method:" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[1, 2, 2, 3, 3, 3, 2, 2, 1]\n" ] } ], "source": [ "async def duplicate_face_value(inp, out):\n", " async for v in inp:\n", " for _ in range(v):\n", " await out.put(v)\n", " out.close()\n", " \n", "async def main():\n", " vals = [1,2,3,2,1]\n", " print(await ac.from_iter(vals).async_apply(duplicate_face_value).collect())\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You may think that this is not too different from connecting the channels yourself and spawn a processing coroutine with `go`. But writing it using `async_apply` makes your intention clearer." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Processing values in a channel and putting the result onto another channel is a very common theme. With `async_apply`, only a single coroutine is working on the values. With `async_pipe`, you can use multiple coroutine instances, getting closer to parallelism:" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]\n", "0.20754481800395297\n" ] } ], "source": [ "async def worker(n):\n", " await asyncio.sleep(0.1)\n", " return n*2\n", "\n", "async def main():\n", " start = asyncio.get_event_loop().time()\n", " print(await ac.from_range(20).async_pipe(10, worker).collect())\n", " print(asyncio.get_event_loop().time() - start)\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We see that processing 20 values only takes about 0.2 seconds even though processing a single value with a single coroutine takes 0.1 seconds: parallelism." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice that the output values are in the correct order. This is the case even if later works complete earlier: `async_pipe` ensures the order while doing its best to have the minimal waiting time. However, in some cases the order is not important, in which case we can use `async_pipe_unordered`:" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]\n", "ordered time: 0.33254893589764833\n", "[14, 8, 6, 0, 20, 12, 24, 10, 22, 16, 36, 18, 2, 4, 26, 28, 38, 30, 32, 34]\n", "unordered time: 0.2875210080528632\n" ] } ], "source": [ "import random\n", "\n", "async def worker(n):\n", " await asyncio.sleep(random.uniform(0, 0.2))\n", " return n*2\n", "\n", "async def main():\n", " start = asyncio.get_event_loop().time()\n", " print(await ac.from_range(20).async_pipe(10, worker).collect())\n", " print('ordered time:', asyncio.get_event_loop().time() - start)\n", "\n", " start = asyncio.get_event_loop().time()\n", " print(await ac.from_range(20).async_pipe_unordered(10, worker).collect())\n", " print('unordered time:', asyncio.get_event_loop().time() - start)\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We see that unordered processing in the face of random processing time has efficiency advantage." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To recap:\n", "\n", "* use `async_apply` to give your custom processing pipeline a uniform look\n", "* use `async_pipe` for parallelism within asyncio\n", "* you can get more concurrency with `async_pipe_unordered`, but you give up the return order" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.6" } }, "nbformat": 4, "nbformat_minor": 2 }