{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": { "nbsphinx": "hidden" }, "outputs": [], "source": [ "import sys\n", "sys.path.append('../')\n", "\n", "# import os\n", "# os.environ['PYTHONASYNCIODEBUG'] = '1'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note: you can try this tutorial in [![Binder](https://mybinder.org/badge.svg)](https://mybinder.org/v2/gh/zh217/aiochan/master?filepath=doc%2Fcombine.ipynb)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Combination operations" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Channels can act as versatile conduits for the flow of data, as in our examples of the fan-in and fan-out pattern. Here we discuss some convenient functions and constructs for dealing with more complicated patterns for the combination of data." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Merging values" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In fan-in, we passed the channels to all coroutines interested in producing data. Often we find that these producers provide their own output channels instead. In this case, we can `merge` these channels into a single one:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "p2-0\n", "p1-0\n", "p2-1\n", "p1-1\n", "p2-2\n", "p1-2\n", "p2-3\n", "p1-3\n", "p2-4\n", "p1-4\n", "p1-5\n", "p2-5\n", "p1-6\n", "p2-6\n", "p2-7\n", "p1-7\n", "p2-8\n", "p1-8\n", "p2-9\n", "p1-9\n" ] } ], "source": [ "import aiochan as ac\n", "import asyncio\n", "\n", "async def main():\n", " p1 = ac.from_range(10).map(lambda x: 'p1-' + str(x))\n", " p2 = ac.from_range(10).map(lambda x: 'p2-' + str(x))\n", " out = ac.merge(p1, p2)\n", " async for v in out:\n", " print(v)\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As in manual fan-in, the order of the values are somewhat non-deterministic. If you want your channels to produce values in-sync, you want to use `zip_chans`:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['p1-0', 'p2-0']\n", "['p1-1', 'p2-1']\n", "['p1-2', 'p2-2']\n", "['p1-3', 'p2-3']\n", "['p1-4', 'p2-4']\n", "['p1-5', 'p2-5']\n", "['p1-6', 'p2-6']\n", "['p1-7', 'p2-7']\n", "['p1-8', 'p2-8']\n", "['p1-9', 'p2-9']\n" ] } ], "source": [ "async def main():\n", " p1 = ac.from_range(10).map(lambda x: 'p1-' + str(x))\n", " p2 = ac.from_range(10).map(lambda x: 'p2-' + str(x))\n", " out = ac.zip_chans(p1, p2)\n", " async for v in out:\n", " print(v)\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A even more complicated use case is that your producer produces items at different rates, and you want to keep track of the *latest* values produced by each of them:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['p1-0', None]\n", "['p1-0', 'p2-0']\n", "['p1-1', 'p2-0']\n", "['p1-1', 'p2-1']\n", "['p1-1', 'p2-2']\n", "['p1-2', 'p2-2']\n", "['p1-2', 'p2-3']\n", "['p1-3', 'p2-3']\n", "['p1-4', 'p2-3']\n", "['p1-4', 'p2-4']\n", "['p1-5', 'p2-4']\n", "['p1-5', 'p2-5']\n", "['p1-6', 'p2-5']\n", "['p1-6', 'p2-6']\n", "['p1-7', 'p2-6']\n", "['p1-7', 'p2-7']\n", "['p1-7', 'p2-8']\n", "['p1-8', 'p2-8']\n", "['p1-9', 'p2-8']\n", "['p1-9', 'p2-9']\n" ] } ], "source": [ "async def main():\n", " p1 = ac.from_range(10).map(lambda x: 'p1-' + str(x))\n", " p2 = ac.from_range(10).map(lambda x: 'p2-' + str(x))\n", " out = ac.combine_latest(p1, p2)\n", " async for v in out:\n", " print(v)\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Notice for channels that has yet to produce a value, `None` is put in its place." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As for the functional methods, all of these functions take optional `out` and `close` arguments, controlling the output channel and whether to close the output channel when there is nothing more to be done (i.e., when all source channels are closed)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To recap:\n", "\n", "* use `merge` to combine values from several channels\n", "* use `zip_chans` to combine values from several channels in sync\n", "* use `combine_latest` to combine values and monitor the latest value from each channel" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Distributing values" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We have discussed generalizations of fan-in above. For simple fan-out, we have `distribute`:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "worker0 received 0\n", "worker0 received 1\n", "worker0 received 4\n", "worker1 received 2\n", "worker2 received 3\n", "worker0 received 5\n", "worker0 received 8\n", "worker2 received 6\n", "worker1 received 7\n", "worker0 received 9\n", "worker0 received 12\n", "worker1 received 10\n", "worker2 received 11\n", "worker1 received 13\n", "worker1 received 16\n", "worker0 received 14\n", "worker2 received 15\n", "worker1 received 17\n", "worker2 received 18\n", "worker0 received 19\n" ] } ], "source": [ "async def worker(inp, tag):\n", " async for v in inp:\n", " print('%s received %s' % (tag, v))\n", "\n", "async def main():\n", " inputs = [ac.Chan(name='inp%s' % i) for i in range(3)]\n", " ac.from_range(20).distribute(*inputs)\n", " for idx, c in enumerate(inputs):\n", " ac.go(worker(c, 'worker%s' % idx))\n", " await asyncio.sleep(0.1)\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "One of the benefit of using `distribute` instead of a plain fan-out is that, in the case one of the down-stream channels are closed, `distribute` will try to put the value into another downstream channel so that no values would be lost." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In fan-out and `distribute`, each down-stream consumer obtains non-overlapping subsets of the input. Sometimes we want each consumer to consume the whole input instead. In this case, we want a duplicator, or `Dup` in aiochan. An example:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "worker0 received 0\n", "worker1 received 0\n", "worker1 received 1\n", "worker2 received 0\n", "worker0 received 1\n", "worker2 received 1\n", "worker2 received 2\n", "worker0 received 2\n", "worker1 received 2\n", "worker0 received 3\n", "worker0 received 4\n", "worker1 received 3\n", "worker2 received 3\n", "worker1 received 4\n", "worker2 received 4\n" ] } ], "source": [ "async def worker(inp, tag):\n", " async for v in inp:\n", " print('%s received %s' % (tag, v))\n", " \n", "async def main():\n", " dup = ac.from_range(5).dup()\n", " inputs = [dup.tap() for i in range(3)]\n", "\n", " for idx, c in enumerate(inputs):\n", " ac.go(worker(c, 'worker%s' % idx))\n", " await asyncio.sleep(0.1)\n", " dup.close()\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We see that `ch.dup` creates a duplicator, or `Dup`, and `dup.tap()` creates a new tap on the duplicator that contains the values put into the source channel. As for the functional methods, `dup.tap` accepts the arguments `out` and `close`, which controls what to be used as the output channel and whether to close the output channel when the input is closed." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note that duplicated elements are put to downstream channels in order. This means that if any one of the downstream channels block on put for some reason, the whole progress will be blocked. You should consider giving downstream inputs some buffer if your downstream processors are uneven in their processing speed." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A `Dup` also has the method `untap`, which can be used to untap an existing tapping channel. For example:" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "worker0 received 0\n", "worker2 received 0\n", "worker2 received 1\n", "worker0 received 1\n", "worker0 received 2\n", "worker0 received 3\n", "worker2 received 2\n", "worker2 received 3\n", "worker2 received 4\n", "worker0 received 4\n" ] } ], "source": [ "async def worker(inp, tag):\n", " async for v in inp:\n", " print('%s received %s' % (tag, v))\n", " \n", "async def main():\n", " dup = ac.from_range(5).dup()\n", " inputs = [dup.tap() for i in range(3)]\n", " dup.untap(inputs[1])\n", "\n", " for idx, c in enumerate(inputs):\n", " ac.go(worker(c, 'worker%s' % idx))\n", " await asyncio.sleep(0.1)\n", " dup.close()\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We see that `worker1`, which has been untapped, does not receive anything. In more complicated programs, tappings and untapping can be done dynamically, at arbitrary times." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Another very common idiom is pub-sub, and this is easy to do as well:" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "p0 received 0\n", "px received 0\n", "px received 3\n", "p0 received 3\n", "p1 received 1\n", "p2 received 2\n", "p0 received 6\n", "px received 6\n", "p1 received 4\n", "p2 received 5\n", "p1 received 7\n", "p2 received 8\n", "p0 received 9\n", "px received 9\n" ] } ], "source": [ "async def processor(inp, tag):\n", " async for v in inp:\n", " print('%s received %s' % (tag, v))\n", "\n", "async def main():\n", " source = ac.Chan()\n", " pub = source.pub(lambda x: x % 3)\n", " p1 = pub.sub(1)\n", " p2 = pub.sub(2)\n", " p0 = pub.sub(0)\n", " px = pub.sub(0)\n", " ac.go(processor(p1, 'p1'))\n", " ac.go(processor(p2, 'p2'))\n", " ac.go(processor(p0, 'p0'))\n", " ac.go(processor(px, 'px'))\n", " source.add(0,1,2,3,4,5,6,7,8,9).close()\n", " await asyncio.sleep(0.1)\n", " pub.close()\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this case, the topic is defined by the lambda, which gives the remainder when the item is divided by three. Processors subscribe to the topics they are intrested in, and we see that `p0` and `px` received all numbers with remainder 0, `p1` all numbers with remainder 1, and `p2` all numbers with remainder 2." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A `Pub` also has a method `unsub`, which can be used to unsubscribe a currently subscribing channel. For example:" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "p0 received 0\n", "p0 received 3\n", "p1 received 1\n", "px received 1\n", "px received 2\n", "p2 received 2\n", "p1 received 4\n", "px received 4\n", "px received 5\n", "p2 received 5\n", "p2 received 8\n", "px received 8\n", "px received 7\n", "p0 received 6\n", "p1 received 7\n", "p0 received 9\n" ] } ], "source": [ "async def processor(inp, tag):\n", " async for v in inp:\n", " print('%s received %s' % (tag, v))\n", "\n", "async def main():\n", " source = ac.Chan()\n", " pub = source.pub(lambda x: x % 3)\n", " p1 = pub.sub(1)\n", " p2 = pub.sub(2)\n", " p0 = pub.sub(0)\n", " px = pub.sub(0)\n", " ac.go(processor(p1, 'p1'))\n", " ac.go(processor(p2, 'p2'))\n", " ac.go(processor(p0, 'p0'))\n", " ac.go(processor(px, 'px'))\n", " pub.unsub(0, px)\n", " pub.sub(1, px)\n", " pub.sub(2, px)\n", " source.add(0,1,2,3,4,5,6,7,8,9).close()\n", " await asyncio.sleep(0.1)\n", " pub.close()\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this example, we initially subscribed `px` to the topic 0, but then changed our mind and subscribed it to 1 and 2 instead (yes a channel can subscribe to multiple topics)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There is also `unsub_all`, which can unsubscribe a whole topic in one go:" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "p1 received 1\n", "p1 received 4\n", "p2 received 2\n", "p2 received 5\n", "p2 received 8\n", "p1 received 7\n" ] } ], "source": [ "async def processor(inp, tag):\n", " async for v in inp:\n", " print('%s received %s' % (tag, v))\n", "\n", "async def main():\n", " source = ac.Chan()\n", " pub = source.pub(lambda x: x % 3)\n", " p1 = pub.sub(1)\n", " p2 = pub.sub(2)\n", " p0 = pub.sub(0)\n", " px = pub.sub(0)\n", " ac.go(processor(p1, 'p1'))\n", " ac.go(processor(p2, 'p2'))\n", " ac.go(processor(p0, 'p0'))\n", " ac.go(processor(px, 'px'))\n", " pub.unsub_all(0)\n", " source.add(0,1,2,3,4,5,6,7,8,9)\n", " await asyncio.sleep(0.1)\n", " pub.close()\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now the 0 topic does not have any subscribers after the call to `unsub_all`. If this method is called without argument, *all* subscribers for *all* topics are unsubscribed." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the above examples, we have passed in a lambda for the argument to `pub`. If we don't pass in anything, then the `Pub` assumes that values are tuples, and the first element of the tuple is the topic:" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "p0 received (0, 0)\n", "px received (0, 0)\n", "px received (0, 3)\n", "p0 received (0, 3)\n", "p1 received (1, 1)\n", "p2 received (2, 2)\n", "p0 received (0, 6)\n", "px received (0, 6)\n", "p1 received (1, 4)\n", "p2 received (2, 5)\n", "p1 received (1, 7)\n", "p2 received (2, 8)\n" ] } ], "source": [ "async def processor(inp, tag):\n", " async for v in inp:\n", " print('%s received %s' % (tag, v))\n", "\n", "async def main():\n", " source = ac.Chan()\n", " pub = source.pub()\n", " p1 = pub.sub(1)\n", " p2 = pub.sub(2)\n", " p0 = pub.sub(0)\n", " px = pub.sub(0)\n", " ac.go(processor(p1, 'p1'))\n", " ac.go(processor(p2, 'p2'))\n", " ac.go(processor(p0, 'p0'))\n", " ac.go(processor(px, 'px'))\n", " source.add((0, 0),\n", " (1, 1),\n", " (2, 2),\n", " (0, 3),\n", " (1, 4),\n", " (2, 5),\n", " (0, 6),\n", " (1, 7),\n", " (2, 8),\n", " (3, 9))\n", " await asyncio.sleep(0.1)\n", " pub.close()\n", " \n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As before, `sub` methods all take `out` and `close` arguments that have their usual meaning." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To recap:\n", "\n", "* use `distribute` to distribute values to downstream channels\n", "* use `dup` to duplicate values\n", "* use `pub` for publisher-subscriber systems" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.6" } }, "nbformat": 4, "nbformat_minor": 2 }