{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Context manager cookbook.\n", "### Non exhaustive list of examples with `with` statement\n", "\n", "GitHub doesn't render large Jupyter Notebooks, so just in case, here is an nbviewer [link to the notebook](https://nbviewer.jupyter.org/url/github.com/RadoslawB/learning-machine-learning/blob/master/notebooks/python-api/context-manager-examples.ipynb)." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:30:35.110520Z", "start_time": "2021-04-14T09:30:35.107509Z" } }, "outputs": [], "source": [ "from contextlib import contextmanager" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Change print color" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:30:37.915936Z", "start_time": "2021-04-14T09:30:37.910044Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001B[34mChanges color in context\n", "\u001B[39mOutside the context with default color\n" ] } ], "source": [ "@contextmanager\n", "def print_blue():\n", " print('\\033[34m', end='')\n", " yield\n", " print('\\033[39m', end='')\n", "\n", "with print_blue():\n", " print('Changes color in context')\n", "\n", "print('Outside the context with default color')" ] }, { "cell_type": "markdown", "metadata": { "ExecuteTime": { "end_time": "2021-04-13T19:03:50.848001Z", "start_time": "2021-04-13T19:03:50.845834Z" } }, "source": [ "## Silently ignore error\n", "\n", "Intentionally, suppress expected error. This approach reduces visual noise of try/except" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:30:38.634142Z", "start_time": "2021-04-14T09:30:38.630168Z" } }, "outputs": [], "source": [ "from contextlib import suppress\n", "import os\n", "with suppress(FileNotFoundError):\n", " os.remove('file.txt')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "compared" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Logging level change only in context" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:04.931016Z", "start_time": "2021-04-14T09:31:04.926074Z" } }, "outputs": [], "source": [ "\n", "import logging\n", "\n", "@contextmanager\n", "def debug_logging(logger_name: str, level: int):\n", " logger = logging.getLogger(logger_name)\n", " old_level = logger.getEffectiveLevel()\n", " logger.setLevel(level)\n", " try:\n", " yield logger\n", " finally:\n", " logger.setLevel(old_level)\n", "\n", "with debug_logging('my-logger', logging.DEBUG) as logger:\n", " logger.debug('This will be printed')\n", "\n", "logging\\\n", ".getLogger('my-logger')\\\n", ".info('This wont be logged because default level is WARNING')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Tag maker" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:05.197900Z", "start_time": "2021-04-14T09:31:05.194586Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "
Tag body
" ] } ], "source": [ "@contextmanager\n", "def tag(name):\n", " print(f'<{name}>', end='')\n", " yield\n", " print(f'', end='')\n", "\n", "with tag('header'):\n", " print('Tag body', end='')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Indenter" ] }, { "cell_type": "code", "execution_count": 6, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:05.269542Z", "start_time": "2021-04-14T09:31:05.264958Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "def mimic_python_syntax():\n", " s = \"Hello World\"\n", " print(s)\n", "\n", "mimic_python_syntax()\n" ] } ], "source": [ "class Indenter:\n", " def __init__(self):\n", " self.level = 0\n", "\n", " def __enter__(self):\n", " self.level += 1\n", " return self\n", "\n", " def __exit__(self, exc_type, exc_val, exc_tb):\n", " self.level -= 1\n", "\n", " def print(self, text):\n", " print(' ' * (self.level-1) + text)\n", "\n", "with Indenter() as indenter:\n", " indenter.print('def mimic_python_syntax():')\n", " with indenter:\n", " indenter.print('s = \"Hello World\"')\n", " indenter.print('print(s)')\n", "\n", " indenter.print('\\nmimic_python_syntax()')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## List oprations with transaction\n", "Make copy of input list items, work on copy. If there is no error, replace input list items with items from list used in context." ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:05.344491Z", "start_time": "2021-04-14T09:31:05.341809Z" } }, "outputs": [], "source": [ "@contextmanager\n", "def list_transaction(list_: list):\n", " working = list(list_)\n", " yield working\n", " list_[:] = working" ] }, { "cell_type": "code", "execution_count": 8, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:05.460649Z", "start_time": "2021-04-14T09:31:05.368144Z" } }, "outputs": [ { "ename": "RuntimeError", "evalue": "", "output_type": "error", "traceback": [ "\u001B[0;31m---------------------------------------------------------------------------\u001B[0m", "\u001B[0;31mRuntimeError\u001B[0m Traceback (most recent call last)", "\u001B[0;32m\u001B[0m in \u001B[0;36m\u001B[0;34m\u001B[0m\n\u001B[1;32m 3\u001B[0m \u001B[0;32mwith\u001B[0m \u001B[0mlist_transaction\u001B[0m\u001B[0;34m(\u001B[0m\u001B[0mitems\u001B[0m\u001B[0;34m)\u001B[0m \u001B[0;32mas\u001B[0m \u001B[0mworking\u001B[0m\u001B[0;34m:\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[1;32m 4\u001B[0m \u001B[0mworking\u001B[0m\u001B[0;34m.\u001B[0m\u001B[0mappend\u001B[0m\u001B[0;34m(\u001B[0m\u001B[0;36m4\u001B[0m\u001B[0;34m)\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[0;32m----> 5\u001B[0;31m \u001B[0;32mraise\u001B[0m \u001B[0mRuntimeError\u001B[0m\u001B[0;34m(\u001B[0m\u001B[0;34m)\u001B[0m\u001B[0;34m\u001B[0m\u001B[0;34m\u001B[0m\u001B[0m\n\u001B[0m", "\u001B[0;31mRuntimeError\u001B[0m: " ] } ], "source": [ "items = [1,2,3]\n", "\n", "with list_transaction(items) as working:\n", " working.append(4)\n", " raise RuntimeError()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:05.462461Z", "start_time": "2021-04-14T09:31:05.393Z" } }, "outputs": [], "source": [ "items" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:05.463855Z", "start_time": "2021-04-14T09:31:05.427Z" } }, "outputs": [], "source": [ "with list_transaction(items) as working:\n", " working.append(4)\n", "\n", "print(items)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Database transation\n", "Rolback changes if error occurs in context" ] }, { "cell_type": "code", "execution_count": 9, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:05.569108Z", "start_time": "2021-04-14T09:31:05.565877Z" } }, "outputs": [], "source": [ "class Transaction:\n", " def __init__(self, connection):\n", " self.connection = connection\n", " def __enter__(self):\n", " return self.connection\n", "\n", " def __exit__(self, err_type, err_value, err_traceback):\n", " if err_type:\n", " self.connection.rollback()\n", " else:\n", " self.connection.commit()" ] }, { "cell_type": "code", "execution_count": 10, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:05.573599Z", "start_time": "2021-04-14T09:31:05.571312Z" } }, "outputs": [], "source": [ "import sqlite3\n", "\n", "connection = sqlite3.connect('')" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:05.579020Z", "start_time": "2021-04-14T09:31:05.575105Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[(1, 'Name 1')]\n" ] } ], "source": [ "with Transaction(connection) as t:\n", " t.execute(\"\"\"\n", " CREATE TABLE Users\n", " (\n", " id INTEGER PRIMARY KEY,\n", " name TEXT NOT NULL\n", " )\n", " \"\"\")\n", " t.execute(\"\"\"\n", " INSERT INTO Users\n", " (id, name) VALUES (1, 'Name 1')\n", " \"\"\")\n", "\n", " print(t.execute(\"\"\"SELECT * FROM Users\"\"\").fetchall())" ] }, { "cell_type": "markdown", "metadata": { "ExecuteTime": { "end_time": "2021-04-12T17:50:53.149861Z", "start_time": "2021-04-12T17:50:53.146985Z" } }, "source": [ "## Lazy connection\n", "example from [Python Cookbook 3rd Edition](https://www.goodreads.com/book/show/17152735-python-cookbook)" ] }, { "cell_type": "code", "execution_count": 12, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:05.636726Z", "start_time": "2021-04-14T09:31:05.632128Z" } }, "outputs": [], "source": [ "from socket import socket, AF_INET, SOCK_STREAM\n", "\n", "class LazyConnection:\n", " def __init__(self, address, family=AF_INET, type=SOCK_STREAM):\n", " self.address = address\n", " self.family = AF_INET\n", " self.type = SOCK_STREAM\n", " self.sock = None\n", " def __enter__(self):\n", " if self.sock is not None:\n", " raise RuntimeError('Already connected')\n", "\n", " self.sock = socket(self.family, self.type)\n", " self.sock.connect(self.address)\n", "\n", " return self.sock\n", " def __exit__(self, exc_ty, exc_val, tb):\n", " self.sock.close()\n", " self.sock = None\n" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:05.988718Z", "start_time": "2021-04-14T09:31:05.665586Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "b'HTTP/1.1 301 Moved Permanently\\r\\nServer: Varnish\\r\\nRetry-After: 0\\r\\nLocation: https://www.python.org/index.html\\r\\nContent-Length: 0\\r\\nAccept-Ranges: bytes\\r\\nDate: Wed, 14 Apr 2021 09:31:05 GMT\\r\\nVia: 1.1 varnish\\r\\nConnection: close\\r\\nX-Served-By: cache-ams21030-AMS\\r\\nX-Cache: HIT\\r\\nX-Cache-Hits: 0\\r\\nX-Timer: S1618392666.941079,VS0,VE0\\r\\nStrict-Transport-Security: max-age=63072000; includeSubDomains\\r\\n\\r\\n'\n" ] } ], "source": [ "from functools import partial\n", "connection = LazyConnection(('www.python.org', 80))\n", "\n", "with connection as s:\n", " s.send(b'GET /index.html HTTP/1.0\\r\\n')\n", " s.send(b'Host: www.python.org\\r\\n')\n", " s.send(b'\\r\\n')\n", " resp = b''.join(iter(partial(s.recv, 8192), b''))\n", " print(resp)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Stopwatch" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:07.001137Z", "start_time": "2021-04-14T09:31:05.990797Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Sleeping: 1.0047638416290283\n" ] } ], "source": [ "import time\n", "@contextmanager\n", "def stopwatch(label: str):\n", " start = time.time()\n", " try:\n", " yield\n", " finally:\n", " end = time.time()\n", " print(f'{label}: {end - start}')\n", "\n", "\n", "with stopwatch('Sleeping'):\n", " time.sleep(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Stopwatch with output callable" ] }, { "cell_type": "code", "execution_count": 15, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:07.013223Z", "start_time": "2021-04-14T09:31:07.007429Z" } }, "outputs": [], "source": [ "import time\n", "\n", "class Stopwatch:\n", " def __init__(self, output_callable):\n", " self.output_callable = output_callable\n", "\n", " def __enter__(self):\n", " self.start = time.time()\n", "\n", " def __exit__(self, err_type, err_value, err_traceback):\n", " end = time.time()\n", "\n", " self.output_callable(end - self.start)" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:08.024064Z", "start_time": "2021-04-14T09:31:07.016759Z" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "INFO:stopwatch:1.00174880027771\n" ] } ], "source": [ "import logging\n", "\n", "logging.basicConfig()\n", "\n", "logger = logging.getLogger('stopwatch')\n", "logger.setLevel(logging.DEBUG)\n", "\n", "with Stopwatch(logger.info):\n", " time.sleep(1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Enter the same target multiple times" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:08.034928Z", "start_time": "2021-04-14T09:31:08.028312Z" } }, "outputs": [], "source": [ "class MultilevelStopwatch:\n", " def __init__(self):\n", " self.levels = []\n", "\n", " def __enter__(self):\n", " self.levels.append(time.time())\n", " return self\n", "\n", " def __exit__(self, err_type, err_value, err_traceback):\n", " latest = self.levels.pop()\n", " end = time.time()\n", "\n", " print(f'Level {len(self.levels)+1} took: {end - latest}')" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:09.045554Z", "start_time": "2021-04-14T09:31:08.037719Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Level 2 took: 0.5035400390625\n", "Level 1 took: 1.0045082569122314\n" ] } ], "source": [ "with MultilevelStopwatch() as ms:\n", " time.sleep(.5)\n", " with ms:\n", " time.sleep(.5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## HTTP Session\n", "reuse of TCP connection to improve performance" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:09.049455Z", "start_time": "2021-04-14T09:31:09.047401Z" } }, "outputs": [], "source": [ "# install requests if necessary\n", "# !pip3 install requests" ] }, { "cell_type": "code", "execution_count": 20, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:27.244383Z", "start_time": "2021-04-14T09:31:09.052471Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Using context manager: 7.110113143920898\n", "Establishing HTTP connection for every request: 10.991943836212158\n" ] } ], "source": [ "import requests\n", "\n", "n = 20\n", "\n", "with stopwatch('Using context manager'):\n", " with requests.Session() as session:\n", " for _ in range(n):\n", " session.get(\"http://httpbin.org/cookies/set/sessioncookie/123456789\")\n", "\n", "with stopwatch('Establishing HTTP connection for every request'):\n", " for _ in range(n):\n", " requests.get(\"http://httpbin.org/cookies/set/sessioncookie/123456789\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Nested context manager" ] }, { "cell_type": "code", "execution_count": 21, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:27.254435Z", "start_time": "2021-04-14T09:31:27.249466Z" } }, "outputs": [], "source": [ "from contextlib import contextmanager\n", "\n", "\n", "@contextmanager\n", "def get_state(name):\n", " print(\"entering:\", name)\n", " try:\n", " yield name\n", " finally:\n", " print(\"exiting:\", name)" ] }, { "cell_type": "code", "execution_count": 22, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:27.261737Z", "start_time": "2021-04-14T09:31:27.257322Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "entering: A\n", "entering: B\n", "entering: C\n", "inside with statement: A B C\n", "exiting: C\n", "exiting: B\n", "exiting: A\n" ] } ], "source": [ "\n", "with get_state(\"A\") as A, get_state('B') as B, get_state(\"C\") as C:\n", " print(\"inside with statement:\", A, B, C)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Nesting with exit stack" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Example above written using ExitStack" ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:27.269473Z", "start_time": "2021-04-14T09:31:27.263789Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "entering: A\n", "entering: B\n", "entering: C\n", "Inside\n", "exiting: C\n", "exiting: B\n", "exiting: A\n" ] } ], "source": [ "from contextlib import ExitStack\n", "\n", "with ExitStack() as es:\n", " es.enter_context(get_state('A'))\n", " es.enter_context(get_state('B'))\n", " es.enter_context(get_state('C'))\n", " print('Inside')" ] }, { "cell_type": "markdown", "metadata": { "ExecuteTime": { "end_time": "2021-04-06T12:48:28.441857Z", "start_time": "2021-04-06T12:48:28.437484Z" } }, "source": [ "## Nested context with errors.\n", "Previously opened contexts exit casually" ] }, { "cell_type": "code", "execution_count": 24, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:27.279893Z", "start_time": "2021-04-14T09:31:27.276374Z" } }, "outputs": [], "source": [ "@contextmanager\n", "def raise_error(name, err):\n", " print(\"entering:\", name)\n", " try:\n", " raise err()\n", " finally:\n", " print('exiting:', name)" ] }, { "cell_type": "code", "execution_count": 25, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:27.286242Z", "start_time": "2021-04-14T09:31:27.281966Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "entering: A\n", "entering: B\n", "exiting: B\n", "exiting: A\n", "Caught error \n" ] } ], "source": [ "try:\n", " with get_state(\"A\") as A, raise_error('B', RuntimeError) as B, get_state(\"C\") as C:\n", " print('Inside')\n", "except RuntimeError as e:\n", " print('Caught error', e)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Extract logging and error handling" ] }, { "cell_type": "code", "execution_count": 26, "metadata": { "ExecuteTime": { "end_time": "2021-04-14T09:31:27.295665Z", "start_time": "2021-04-14T09:31:27.290018Z" } }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "ERROR:root:ZeroDivisionError\n", "Traceback (most recent call last):\n", " File \"\", line 19, in errorhandler\n", " yield\n", " File \"\", line 31, in __call__\n", " return a / b\n", "ZeroDivisionError: division by zero\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Custom handling of Zero Division Error! Printing only 2 levels of traceback..\n" ] } ], "source": [ "import logging\n", "from contextlib import contextmanager\n", "import traceback\n", "import sys\n", "\n", "logging.getLogger(__name__)\n", "\n", "logging.basicConfig(\n", " level=logging.INFO,\n", " format=\"\\n(asctime)s [%(levelname)s] %(message)s\",\n", ")\n", "\n", "\n", "class Divider:\n", "\n", " @contextmanager\n", " def errorhandler(self):\n", " try:\n", " yield\n", " except ZeroDivisionError:\n", " print(\n", " f\"Custom handling of Zero Division Error! Printing \"\n", " \"only 2 levels of traceback..\"\n", " )\n", " logging.exception(\"ZeroDivisionError\")\n", "\n", " def __call__(self, a, b):\n", " \"\"\"Function that we want to save from nasty error handling logic.\"\"\"\n", "\n", " with self.errorhandler():\n", " return a / b\n", "\n", "\n", "divide = Divider()\n", "divide(2, 0)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Refference:\n", "\n", "- [Python Cookbook](https://www.goodreads.com/book/show/17152735-python-cookbook)\n", "- [Effective Python](https://www.goodreads.com/book/show/48566725-effective-python?from_search=true&from_srp=true&qid=8WDIFQEqZS&rank=2)\n", "- [Python in a Nutshell](https://www.goodreads.com/book/show/80438.Python_in_a_Nutshell)\n", "- https://dbader.org/blog/python-context-managers-and-with-statement\n", "- https://rednafi.github.io/digressions/python/2020/03/26/python-contextmanager.html\n", "- https://www.youtube.com/watch?v=BzOc6AEvfh8&t=13s&ab_channel=PyCharmbyJetBrains" ] } ], "metadata": { "hide_input": false, "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.2" }, "toc": { "base_numbering": 1, "nav_menu": {}, "number_sections": false, "sideBar": true, "skip_h1_title": false, "title_cell": "Table of Contents", "title_sidebar": "Contents", "toc_cell": false, "toc_position": {}, "toc_section_display": true, "toc_window_display": false } }, "nbformat": 4, "nbformat_minor": 5 }