{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": { "nbsphinx": "hidden" }, "outputs": [], "source": [ "import sys\n", "sys.path.append('../')\n", "\n", "# import os\n", "# os.environ['PYTHONASYNCIODEBUG'] = '1'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note: you can try this tutorial in [![Binder](https://mybinder.org/badge.svg)](https://mybinder.org/v2/gh/zh217/aiochan/master?filepath=doc%2Fquick.ipynb)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# A ten-minutes introduction" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You will need to import the module `aiochan` and `asyncio` first:" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import aiochan as ac\n", "import asyncio" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "A channel is like a golang channel or a Clojure core.async chan. Creating a channel is simple:" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Chan<_unk_0 140697829983528>" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "c = ac.Chan()\n", "c" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the following examples, we use `ac.run` to run the main coroutine. You can also run asyncio loops directly." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can call `await c.put(v)` to put value into the channel, `await c.get()` to get value from the channel, `c.close()` to close the channel, and `ac.go(...)` to spawn a coroutine inside another coroutine:" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "obtained: product 1\n", "obtained: product 2\n", "obtained: product 3\n", "obtained: product 4\n", "obtained: product 5\n", "It is late, let us call it a day.\n", "consumer goes home\n", "producer goes home\n" ] } ], "source": [ "async def producer(c):\n", " i = 0\n", " while True:\n", " await asyncio.sleep(0.1) # producing stuff takes time\n", " i += 1\n", " still_open = await c.put('product ' + str(i))\n", " if not still_open:\n", " print('producer goes home')\n", " break\n", "\n", "\n", "async def consumer(c):\n", " while True:\n", " product = await c.get()\n", " if product is not None:\n", " print('obtained:', product)\n", " else:\n", " print('consumer goes home')\n", " break\n", "\n", "async def main():\n", " c = ac.Chan()\n", " ac.go(producer(c))\n", " ac.go(consumer(c))\n", " await asyncio.sleep(0.6)\n", " print('It is late, let us call it a day.')\n", " c.close()\n", " await asyncio.sleep(0.2) # necessary to wait for producer\n", "\n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Channel works as an async iterator:" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "obtained: product 1\n", "obtained: product 2\n", "obtained: product 3\n", "obtained: product 4\n", "obtained: product 5\n", "It is late, let us call it a day.\n", "consumer goes home\n", "producer goes home\n" ] } ], "source": [ "async def producer(c):\n", " i = 0\n", " while True:\n", " await asyncio.sleep(0.1) # producing stuff takes time\n", " i += 1\n", " still_open = await c.put('product ' + str(i))\n", " if not still_open:\n", " print('producer goes home')\n", " break\n", "\n", "\n", "async def consumer(c):\n", " async for product in c:\n", " print('obtained:', product)\n", " print('consumer goes home')\n", "\n", "async def main():\n", " c = ac.Chan()\n", " ac.go(producer(c))\n", " ac.go(consumer(c))\n", " await asyncio.sleep(0.6)\n", " print('It is late, let us call it a day.')\n", " c.close()\n", " await asyncio.sleep(0.2) # necessary to wait for producer\n", "\n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "`select`, which is the whole point of channels, works as in golang or `alt!` in Clojure's core.async to complete one and only one operation non-deterministically:" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "received worker0-1\n", "received worker1-1\n", "received worker2-1\n", "received worker0-2\n", "received worker1-2\n", "received worker2-2\n", "received worker0-3\n", "received worker1-3\n", "received worker2-3\n", "received worker0-4\n", "received worker1-4\n", "received worker2-4\n", "received worker0-5\n", "received worker1-5\n", "received worker2-5\n", "consumer stopped\n", "worker0 stopped\n", "worker1 stopped\n", "worker2 stopped\n" ] } ], "source": [ "async def worker(out, stop, tag):\n", " i = 0\n", " while True:\n", " i += 1\n", " await asyncio.sleep(0.1)\n", " result, c = await ac.select(stop, (out, '%s-%s' % (tag, i)), priority=True)\n", " if c is stop:\n", " print('%s stopped' % tag)\n", " break\n", "\n", "async def consumer(c, stop):\n", " while True:\n", " result, c = await ac.select(stop, c, priority=True)\n", " if c is stop:\n", " print('consumer stopped')\n", " break\n", " else:\n", " print('received', result)\n", "\n", "async def main():\n", " c = ac.Chan()\n", " stop = ac.Chan()\n", " for i in range(3):\n", " ac.go(worker(c, stop, 'worker%s' % i))\n", " ac.go(consumer(c, stop))\n", " await asyncio.sleep(0.6)\n", " stop.close()\n", " await asyncio.sleep(0.2)\n", "\n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Channels can use some buffering to implement back-pressure:" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "producing 1\n", "producing 2\n", "producing 3\n", "consuming 1\n", "producing 4\n", "producing 5\n", "consuming 2\n", "producing 6\n", "consuming 3\n", "producing 7\n", "consuming 4\n", "producing 8\n" ] } ], "source": [ "async def worker(c):\n", " i = 0\n", " while True:\n", " i += 1\n", " await asyncio.sleep(0.05)\n", " print('producing', i)\n", " await c.put(i)\n", "\n", "async def consumer(c):\n", " while True:\n", " await asyncio.sleep(0.2)\n", " result = await c.get()\n", " print('consuming', result)\n", "\n", "async def main():\n", " c = ac.Chan(3)\n", " ac.go(worker(c))\n", " ac.go(consumer(c))\n", " await asyncio.sleep(1)\n", "\n", "ac.run(main())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Sliding and dropping buffers are also available." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "That's all the basics, but there are much more: functional methods, combination patterns, pipelines, thread or process-based parallelism and so on. Read the in-depth tutorial or the API documentation to find out more." ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.6" } }, "nbformat": 4, "nbformat_minor": 2 }