{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "Async/Await and Non-Blocking Execution\n", "=======================================\n", "\n", "Dask integrates natively with concurrent applications using the [Tornado](https://www.tornadoweb.org/en/stable/) or [Asyncio](https://docs.python.org/3/library/asyncio.html) frameworks, and can make use of Python's `async` and `await` keywords.\n", "\n", "This example shows a small example how to start up a Dask Client in asynchronous mode." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `asynchronous=True` parameter\n", "---------------------------------\n", "\n", "Dask LocalCluster and Client objects can operate in async-await mode if you pass the `asynchronous=True` parameter." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from dask.distributed import Client\n", "client = await Client(asynchronous=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def inc(x: int) -> int:\n", " return x + 1\n", "\n", "future = client.submit(inc, 10)\n", "future" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "await future" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Collections\n", "-----------\n", "\n", "Note that blocking operations like the `.compute()` method aren't ok to use in asynchronous mode. Instead you'll have to use the `Client.compute` method." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import dask\n", "df = dask.datasets.timeseries()\n", "df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = df.persist() # persist is non-blocking, so it's ok" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "total = df[['x', 'y']].sum() # lazy computations are also ok" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# total.compute() # but compute is bad, because compute blocks until done" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "future = client.compute(total)\n", "future" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "await future" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Within a script\n", "---------------\n", "\n", "Running async/await code in Jupyter is a bit atypical. Jupyter already has an event loop running, so it's easy to use async/await syntax directly within it. In a normal Python script this won't be the case. Here is an example script that should run within a normal Python interpreter or as a script.\n", "\n", "```python\n", "import asyncio\n", "from dask.distributed import Client\n", "\n", "\n", "def inc(x: int) -> int:\n", " return x + 1\n", "\n", "\n", "async def f():\n", " async with Client(asynchronous=True) as client:\n", " future = client.submit(inc, 10)\n", " result = await future\n", " print(result)\n", "\n", "\n", "if __name__ == '__main__':\n", " asyncio.get_event_loop().run_until_complete(f())\n", "```" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" } }, "nbformat": 4, "nbformat_minor": 4 }