{ "cells": [ { "cell_type": "markdown", "id": "c4446edd", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "# A Brief History of Async\n", "\n", "

\n", " \n", "

\n", "\n", "[Open in nbviewer](https://nbviewer.org/github/nanvel/slides-async/blob/master/async_talk.ipynb)\n", "\n", "ThaiPy#85 2022-11-10\n", "\n", "[Oleksandr Polieno](https://github.com/nanvel)" ] }, { "cell_type": "markdown", "source": [ "Plan:\n", "- Parallelism and concurrency\n", "- Why do we need async, pros/cons\n", "- Event-driven io: from callbacks to async\n", "- IOLoop\n", "- Coroutines\n", "- async/await syntax\n", "- Overview\n", "\n", "Async - a first-class citizen in Python that simplifies concurrency implementation in a single thread." ], "metadata": { "collapsed": false } }, { "cell_type": "markdown", "source": [ "## Parallelism\n", "\n", "Distribute work across multiple computers:\n", "\n", "![parallelism](parallelism.png)\n", "\n", "A single computer:\n", "\n", "![computer](computer.png)\n", "\n", "![htop](htop.png)\n", "\n", "Using multiple CPU cores:\n", "\n", "![parallelism2](parallelism2.png)\n", "\n", "The Python Global Interpreter Lock (GIL) is a lock that allows only one thread to hold the control of the python interpreter." ], "metadata": { "collapsed": false } }, { "cell_type": "markdown", "source": [ "## Concurrency\n", "\n", "Concurrency == simulating parallelism by switching context.\n", "\n", "Executing multiple tasks at the same time but not necessarily simultaneously.\n", "\n", "![concurrency](concurrency.png)" ], "metadata": { "collapsed": false } }, { "cell_type": "markdown", "source": [ "### Sequential execution" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": 40, "outputs": [], "source": [ "import httpx\n", "\n", "\n", "def job(n):\n", " print(f\"--- request {n} sent\")\n", " httpx.get(f\"https://example.com/{n}\")\n", " print(f\"--- response {n} received\")" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "code", "execution_count": 41, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--- request 1 sent\n", "--- response 1 received\n", "--- request 2 sent\n", "--- response 2 received\n", "CPU times: user 47.7 ms, sys: 7.25 ms, total: 54.9 ms\n", "Wall time: 2.43 s\n" ] } ], "source": [ "%%time\n", "\n", "job(1)\n", "job(2)" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "### Threads\n", "\n", "A thread is an execution context, which is all the information a CPU needs to execute a stream of instructions.\n", "\n", "Switching context every sys.getswitchinterval() (5ms default)" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": 1, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "0.005\n" ] } ], "source": [ "import sys\n", "\n", "\n", "print(sys.getswitchinterval())" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "code", "execution_count": 43, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--- request 1 sent\n", "--- request 2 sent\n", "--- response 1 received\n", "--- response 2 received\n", "CPU times: user 37.3 ms, sys: 14.2 ms, total: 51.5 ms\n", "Wall time: 1.09 s\n" ] } ], "source": [ "%%time\n", "\n", "from functools import partial\n", "from threading import Thread\n", "\n", "\n", "thread_1 = Thread(target=partial(job, n=1))\n", "thread_2 = Thread(target=partial(job, n=2))\n", "\n", "thread_1.start()\n", "thread_2.start()\n", "\n", "thread_1.join()\n", "thread_2.join()" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "### Async (cooperative multitasking with coroutines)\n", "\n", "Concurrency implementation that uses a single thread.\n", "\n", "Parts of an application cooperate to switch tasks explicitly at optimal times." ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": 44, "outputs": [], "source": [ "async def ajob(client, n):\n", " print(f\"--- request {n} sent\")\n", " await client.get(f\"https://example.com/{n}\")\n", " print(f\"--- response {n} received\")" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "code", "execution_count": 47, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "--- request 1 sent\n", "--- request 2 sent\n", "--- response 1 received\n", "--- response 2 received\n", "0.8680598735809326\n" ] } ], "source": [ "import asyncio\n", "import time\n", "\n", "\n", "async def amain():\n", " async with httpx.AsyncClient() as client:\n", " await asyncio.gather(\n", " ajob(client=client, n=1),\n", " ajob(client=client, n=2)\n", " )\n", "\n", "\n", "start = time.time()\n", "await amain()\n", "print(time.time() - start)" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "![Multiprocessing, multithreading, async](multi_thread_async.png)\n", "\n", "Only one thread can be active inside a process at a time. Only one async task can be active inside a thread at a time." ], "metadata": { "collapsed": false } }, { "cell_type": "markdown", "source": [ "## Why do we need async?\n", "\n", "Cons of threads:\n", "- threads are heavier\n", "- the code has to be thread-safe\n", "- switching is not under our control (*)\n", "\n", "Cons of async:\n", "- special syntax (steeper learning curve)\n", "- need ioloop to execute\n", "- no blocking code allowed (*)" ], "metadata": { "collapsed": false } }, { "cell_type": "markdown", "source": [ "## Timeline\n", "\n" ], "metadata": { "collapsed": false } }, { "cell_type": "markdown", "source": [ "- 1991 - The first python release\n", "- 2001 - Simple generators ([PEP 255](https://peps.python.org/pep-0255/))\n", "- 2002 - Twisted - event driven networking engine\n", "- 2005\n", " * Python 2.5\n", " * Coroutines via enhanced generators ([PEP 342](https://peps.python.org/pep-0342/))\n", "- 2008\n", " * Python 2.6\n", " * Python 3.0\n", "- 2009 - Tornado opensourced by Facebook (developed by FriendFeed)\n", "- 2010 - Python 2.7\n", "- 2012\n", " * Python 3.3\n", " * proposed to make Tulip/asyncio a part of stdlib (the Tulip project is the asyncio module for Python)\n", " * `yield from`: Syntax for Delegating to a Subgenerator ([PEP 380](https://peps.python.org/pep-0380/) - 2009)\n", " * `return value` = `raise StopIteration(value)` in generators\n", "- 2014\n", " * Python 3.4\n", " * asyncio is a part of stdlib\n", "- 2015\n", " * Python 3.5\n", " * async/await syntax (native coroutines) ([PEP 492](https://peps.python.org/pep-0492/))\n", "- 2016\n", " * Python 3.6\n", " * Asynchronous generators ([PEP 525](https://peps.python.org/pep-0525/))\n", " * Asynchronous comprehensions ([PEP 530](https://peps.python.org/pep-0530/))\n", "- 2018\n", " * Python 3.7\n", " * support for [generator-based coroutines is deprecated](https://docs.python.org/3.7/library/asyncio-task.html#generator-based-coroutines) and is scheduled for removal in Python 3.10\n", " * FastAPI first release\n", " * Tornado integration with asyncio by default (Tornado IOLoop is a wrapper around asyncio.ioloop)\n", "- 2021 - Python 3.10" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } } }, { "cell_type": "markdown", "source": [ "## From callbacks to async" ], "metadata": { "collapsed": false } }, { "cell_type": "markdown", "source": [ "### Synchronous" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [ "from tornado.httpclient import HTTPClient\n", "\n", "\n", "def synchronous_fetch(url):\n", " http_client = HTTPClient()\n", " response = http_client.fetch(url)\n", " return response.body" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "### With callbacks" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [ "from tornado.httpclient import AsyncHTTPClient\n", "\n", "\n", "def asynchronous_fetch_callbacks(url, callback):\n", " http_client = AsyncHTTPClient()\n", "\n", " def handle_response(response):\n", " callback(response.body)\n", "\n", " http_client.fetch(url, callback=handle_response)" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "![IOLoop callback](ioloop_cb.png)\n", "\n", "`AsyncHTTPClient().fetch()` is not blocking the function." ], "metadata": { "collapsed": false } }, { "cell_type": "markdown", "source": [ "### With `Future`" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [ "from tornado.concurrent import Future\n", "from tornado.httpclient import AsyncHTTPClient\n", "\n", "\n", "def asynchronous_fetch_future(url):\n", " http_client = AsyncHTTPClient()\n", " my_future = Future()\n", " fetch_future = http_client.fetch(url)\n", "\n", " def on_fetch(f):\n", " my_future.set_result(f.result().body)\n", "\n", " fetch_future.add_done_callback(on_fetch)\n", " return my_future" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "### With Tornado `gen` (generator based coroutine)\n", "\n", "Can be used in Python 2.5+." ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [ "from tornado import gen\n", "from tornado.httpclient import AsyncHTTPClient\n", "\n", "\n", "@gen.coroutine\n", "def asynchronous_fetch_gen(url):\n", " http_client = AsyncHTTPClient()\n", " response = yield http_client.fetch(url)\n", " raise gen.Return(response.body)" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "![coroutine yield](cotoutine_yield.png)" ], "metadata": { "collapsed": false } }, { "cell_type": "markdown", "source": [ "### `yield from` and `return` for generator based coroutines" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [ "@asyncio.coroutine\n", "def asynchronous_fetch_gen(url):\n", " http_client = AsyncHTTPClient()\n", " response = yield http_client.fetch(url)\n", " return response.body\n", "\n", "\n", "@asyncio.coroutine\n", "def amain():\n", " result = yield from asynchronous_fetch_gen('https://example.com')\n", " return result" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "`yield from` and `return` support was added in Python 3.3 ([PEP 380 – Syntax for Delegating to a Subgenerator](https://peps.python.org/pep-0380/))." ], "metadata": { "collapsed": false } }, { "cell_type": "markdown", "source": [ "### With `async`/`await` (native coroutine)" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [ "from tornado.httpclient import AsyncHTTPClient\n", "\n", "\n", "async def asynchronous_fetch(url):\n", " http_client = AsyncHTTPClient()\n", " response = await http_client.fetch(url)\n", " return response.body" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "async/await was added in Python 3.5 ([PEP 492 – Coroutines with async and await syntax](https://peps.python.org/pep-0492/)).\n", "\n", "Native coroutine (`async def`):\n", "- doesn't require `await`\n", "- runtime warning when garbage collected and not awaited\n", "- can not use `next()` on the coroutine (native coroutine is not a generator)\n", "- can not use `yield from` inside native coroutines\n", "- await validates that the right argument is awaitable\n", "\n", "Awaitable:\n", "- Coroutine\n", "- Future\n", "- Task (a subclass of Future)\n", "\n", "Future: low-level representation of a future result.\n", "\n", "Task: a subclass of Future that knows how to wrap and manage the execution of a coroutine; it is possible to cancel a task by using the task object.\n", "When a coroutine is wrapped in a task - automatically scheduled to run soon.\n", "\n", "`await` can be used only inside a coroutine.\n", "\n", "`async for`: supports async iterator, `__next__` -> `__anext__`.\n", "\n", "`async with`: supports async context managers, `__enter__` and `__exit__` -> `__aenter__` and `__aexit__`.\n", "\n", "`asyncio.Queue`: not thread safe, put/get are coroutines." ], "metadata": { "collapsed": false } }, { "cell_type": "markdown", "source": [ "## IOLoop\n", "\n", "ioloop:\n", "- run asynchronous tasks and callbacks\n", "- perform network IO operations (efficiently handling io events, system events)\n", "- run blocking code in a thread or process pool" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [ "# IOLoop Hello World!\n", "\n", "import asyncio\n", "\n", "\n", "def hello_world(loop):\n", " \"\"\"A callback to print 'Hello World' and stop the event loop\"\"\"\n", " print('Hello World')\n", " loop.stop()\n", "\n", "\n", "loop = asyncio.get_event_loop()\n", "\n", "# Schedule a call to hello_world()\n", "loop.call_soon(hello_world, loop)\n", "\n", "# Blocking call interrupted by loop.stop()\n", "try:\n", " loop.run_forever()\n", "finally:\n", " loop.close()" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [ "import asyncio\n", "import concurrent.futures\n", "\n", "\n", "def blocking_io():\n", " # File operations (such as logging) can block the\n", " # event loop: run them in a thread pool.\n", " with open('/dev/urandom', 'rb') as f:\n", " return f.read(100)\n", "\n", "\n", "if __name__ == '__main__':\n", " ioloop = asyncio.get_event_loop()\n", "\n", " with concurrent.futures.ThreadPoolExecutor() as pool:\n", " ioloop.run_until_complete(\n", " ioloop.run_in_executor(\n", " pool,\n", " blocking_io\n", " )\n", " )\n", "\n", " ioloop.close()" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "In long-running tasks, we can release IOLoop by calling `await asyncio.sleep(0)`." ], "metadata": { "collapsed": false } }, { "cell_type": "markdown", "source": [ "### [Selectors](https://docs.python.org/3/library/selectors.html)\n", "\n", "Allows high-level and efficient I/O multiplexing.\n", "\n", "Can be used to wait for I/O readiness notification on multiple file objects.\n", "\n", "Based on [select](https://docs.python.org/3/library/select.html) that provides access to the select() and poll() functions available in most operating systems." ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [ "# https://docs.python.org/3/library/selectors.html\n", "\n", "import selectors\n", "import socket\n", "\n", "sel = selectors.DefaultSelector()\n", "\n", "def accept(sock, mask):\n", " conn, addr = sock.accept() # Should be ready\n", " print('accepted', conn, 'from', addr)\n", " conn.setblocking(False)\n", " sel.register(conn, selectors.EVENT_READ, read)\n", "\n", "def read(conn, mask):\n", " data = conn.recv(1000) # Should be ready\n", " if data:\n", " print('echoing', repr(data), 'to', conn)\n", " conn.send(data) # Hope it won't block\n", " else:\n", " print('closing', conn)\n", " sel.unregister(conn)\n", " conn.close()\n", "\n", "sock = socket.socket()\n", "sock.bind(('localhost', 1234))\n", "sock.listen(100)\n", "sock.setblocking(False)\n", "sel.register(sock, selectors.EVENT_READ, accept)\n", "\n", "while True:\n", " events = sel.select()\n", " for key, mask in events:\n", " callback = key.data\n", " callback(key.fileobj, mask)" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "![selectors](selectors.png)" ], "metadata": { "collapsed": false } }, { "cell_type": "markdown", "source": [ "## Generators and coroutines\n", "\n", "Generators are able to give up control to the caller without losing their state, with an option to resume the execution." ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": 7, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "start\n", "1\n", "2\n", "3\n", "\n", "\n" ] } ], "source": [ "import time\n", "\n", "\n", "def my_generator():\n", " print('start')\n", " yield '1'\n", " yield '2'\n", " time.sleep(1)\n", " yield '3'\n", "\n", "\n", "g = my_generator()\n", "i = next(g)\n", "print(i)\n", "\n", "for i in g:\n", " print(i)\n", "\n", "\n", "print(type(g))\n", "\n", "generator_exp = (i for i in range(3))\n", "\n", "print(type(generator_exp))" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "A coroutine is a generator function that can both yield values and accept values from the outside." ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": 14, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "return 1\n", "Received None\n", "return 2\n", "Received data\n" ] }, { "ename": "StopIteration", "evalue": "", "output_type": "error", "traceback": [ "\u001B[0;31m---------------------------------------------------------------------------\u001B[0m", "\u001B[0;31mStopIteration\u001B[0m Traceback (most recent call last)", "Cell \u001B[0;32mIn [14], line 14\u001B[0m\n\u001B[1;32m 12\u001B[0m \u001B[38;5;28mprint\u001B[39m(\u001B[38;5;28mnext\u001B[39m(g))\n\u001B[1;32m 13\u001B[0m \u001B[38;5;28mprint\u001B[39m(\u001B[38;5;28mnext\u001B[39m(g))\n\u001B[0;32m---> 14\u001B[0m \u001B[38;5;28mprint\u001B[39m(\u001B[43mg\u001B[49m\u001B[38;5;241;43m.\u001B[39;49m\u001B[43msend\u001B[49m\u001B[43m(\u001B[49m\u001B[38;5;124;43m'\u001B[39;49m\u001B[38;5;124;43mdata\u001B[39;49m\u001B[38;5;124;43m'\u001B[39;49m\u001B[43m)\u001B[49m)\n", "\u001B[0;31mStopIteration\u001B[0m: " ] } ], "source": [ "# Generator based coroutine\n", "\n", "def my_generator():\n", " rec = yield \"return 1\"\n", " print(f\"Received {rec}\")\n", " rec = yield \"return 2\"\n", " print(f\"Received {rec}\")\n", "\n", "\n", "g = my_generator()\n", "\n", "print(next(g))\n", "print(next(g))\n", "print(g.send('data'))" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "## Code examples" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [ "# running a coroutine\n", "\n", "import asyncio\n", "\n", "async def coro():\n", " await asyncio.sleep(0.5)\n", "\n", "asyncio.run(coro())" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n", "is_executing": true } } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [ "# schedules the coroutine in the current loop\n", "task = asyncio.ensure_future(coro_or_future)" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "code", "execution_count": 2, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "1\n", "2\n" ] } ], "source": [ "# async generator\n", "\n", "import asyncio\n", "\n", "\n", "async def gen():\n", " await asyncio.sleep(1)\n", " yield 1\n", " await asyncio.sleep(1)\n", " yield 2\n", "\n", "\n", "async for res in gen():\n", " print(res)" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "markdown", "source": [ "Async generators support was added in Python 3.6 ([PEP 525 – Asynchronous Generators](https://peps.python.org/pep-0525/))" ], "metadata": { "collapsed": false } }, { "cell_type": "code", "source": [ "# async comprehensions\n", "\n", "result = [await fun() for fun in funcs]\n", "result = {await fun() for fun in funcs}\n", "result = {fun: await fun() for fun in funcs}\n", "\n", "result = [await fun() for fun in funcs if await smth]\n", "result = {await fun() for fun in funcs if await smth}\n", "result = {fun: await fun() for fun in funcs if await smth}\n", "\n", "result = [await fun() async for fun in funcs]\n", "result = {await fun() async for fun in funcs}\n", "result = {fun: await fun() async for fun in funcs}\n", "\n", "result = [await fun() async for fun in funcs if await smth]\n", "result = {await fun() async for fun in funcs if await smth}\n", "result = {fun: await fun() async for fun in funcs if await smth}" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "Async comprehensions support was added in Python 3.6 ([PEP 530 – Asynchronous Comprehensions](https://peps.python.org/pep-0530/))" ], "metadata": { "collapsed": false } }, { "cell_type": "markdown", "source": [ "## Overview\n", "\n", "`asyncio` success:\n", "- Growing world connectivity and a request for event-driven networking\n", "- A part of stdlib\n", "- Unified ioloop interface\n", "- Native coroutines and async/await syntax\n", "- Wide support\n", "\n", "" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%% md\n" } } } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.8" } }, "nbformat": 4, "nbformat_minor": 5 }