{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "hide_input": false }, "outputs": [], "source": [ "#|hide\n", "#default_exp test\n", "from nbdev.showdoc import show_doc" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#|export\n", "from nbdev.imports import *\n", "from nbdev.sync import *\n", "from nbdev.export import *\n", "from nbdev.export import _mk_flag_re\n", "from nbdev.export2html import _re_notebook2script\n", "from fastcore.script import *\n", "from fastcore.parallel import *\n", "\n", "from nbconvert.preprocessors import ExecutePreprocessor\n", "import nbformat" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Extract tests\n", "\n", "> The functions that grab the cells containing tests (filtering with potential flags) and execute them" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Everything that is not an exported cell is considered a test, so you should make sure your notebooks can all run smoothly (and fast) if you want to use this functionality as the CLI. You can mark some cells with special flags (like slow) to make sure they are only executed when you authorize it. Those flags should be configured in your `settings.ini` (separated by a `|` if you have several of them). You can also apply flags to one entire notebook by using the `all` option, e.g. `#all_slow`, in code cells.\n", "\n", "If `tst_flags=slow|fastai` in `settings.ini`, you can:\n", "- mark slow tests with `#slow` flag\n", "- mark tests that depend on fastai with the `#fastai` flag." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Detect flags" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following functions detect the cells that should be excluded from the tests (unless their special flag is passed)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#|export\n", "class _ReTstFlags():\n", " \"Test flag matching regular expressions\"\n", " def __init__(self, all_flag):\n", " \"match flags applied to all cells?\"\n", " self.all_flag = all_flag\n", "\n", " def _deferred_init(self):\n", " \"Compile at first use but not before since patterns need `get_config().tst_flags`\"\n", " if hasattr(self, '_re'): return\n", " tst_flags = get_config().get('tst_flags', '')\n", " tst_flags += f'|skip' if tst_flags else 'skip'\n", " _re_all = 'all_' if self.all_flag else ''\n", " self._re = _mk_flag_re(f\"{_re_all}({tst_flags})\", 0, \"Any line with a test flag\")\n", "\n", " def findall(self, source):\n", " self._deferred_init()\n", " return self._re.findall(source)\n", "\n", " def search(self, source):\n", " self._deferred_init()\n", " return self._re.search(source)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#|export\n", "_re_all_flag = _ReTstFlags(True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#|export\n", "def get_all_flags(cells):\n", " \"Check for all test flags in `cells`\"\n", " result = []\n", " for cell in cells:\n", " if cell['cell_type'] == 'code': result.extend(_re_all_flag.findall(cell['source']))\n", " return set(result)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "nb = read_nb(\"04_test.ipynb\")\n", "assert get_all_flags(nb['cells']) == set()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#|hide\n", "tst_flags_bck=get_config().get('tst_flags')\n", "try:\n", " get_config()['tst_flags'] = 'fastai|vslow'\n", " if hasattr(_re_all_flag, '_re'): del _re_all_flag._re\n", " cells = [{'cell_type': cell_type, 'source': source} for cell_type, source in [\n", " ('code', '# export\\nfrom local.core import *'), \n", " ('markdown', '# title of some kind'), \n", " ('code', '# all_vslow \\n# all_fastai'),\n", " ('code', '#all_vslow\\n# all_fastai'),\n", " ('code', '#all_vslow\\n#all_skip'),\n", " ('code', '# all_fastai'),\n", " ('code', '#all_fastai\\n')]]\n", " for i in range(3):\n", " test_eq(set(['vslow','fastai','skip']), get_all_flags(cells))\n", " cells.pop(2)\n", " for i in range(2):\n", " test_eq(set(['fastai']), get_all_flags(cells))\n", " cells.pop(2)\n", " test_eq(set(), get_all_flags(cells))\n", "finally:\n", " get_config()['tst_flags'] = tst_flags_bck\n", " del _re_all_flag._re" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#|export\n", "_re_flags = _ReTstFlags(False)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#|export\n", "def get_cell_flags(cell):\n", " \"Check for any special test flag in `cell`\"\n", " if cell['cell_type'] != 'code' or len(get_config().get('tst_flags',''))==0: return []\n", " return _re_flags.findall(cell['source'])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test_eq(get_cell_flags({'cell_type': 'code', 'source': \"#hide\\n\"}), [])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#|hide\n", "from nbformat.v4 import new_code_cell\n", "\n", "for expected, flag in [(['fastai'], 'fastai'), ([], 'vslow')]:\n", " test_eq(expected, get_cell_flags(new_code_cell(f\"#hide\\n# {flag}\\n\")))\n", " test_eq(expected, get_cell_flags(new_code_cell(f\"# {flag}\\n#hide\\n\")))\n", " test_eq(expected, get_cell_flags(new_code_cell(f\"#{flag}\\n#hide\\n\")))\n", " test_eq([], get_cell_flags(new_code_cell(\"#hide\\n\")))\n", " test_eq([], get_cell_flags(new_code_cell(f\"# all_{flag}\")))\n", " test_eq([], get_cell_flags(new_code_cell(f\"#all_{flag}\")))\n", "tst_flags_bck=get_config().get('tst_flags')\n", "try:\n", " get_config()['tst_flags'] = 'fastai|vslow'\n", " del _re_flags._re\n", " test_eq(['vslow'], get_cell_flags(new_code_cell(f\"#hide\\n# vslow\\n\")))\n", " test_eq(['vslow'], get_cell_flags(new_code_cell(f\"#hide\\n#vslow\\n\")))\n", " test_eq(['vslow', 'fastai'], get_cell_flags(new_code_cell(f\"#hide\\n# vslow\\n# fastai\")))\n", " test_eq(['fastai', 'vslow'], get_cell_flags(new_code_cell(f\"#fastai\\n#vslow\")))\n", "finally:\n", " get_config()['tst_flags'] = tst_flags_bck\n", " del _re_flags._re" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Testing a notebook" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#|export\n", "class NoExportPreprocessor(ExecutePreprocessor):\n", " \"An `ExecutePreprocessor` that executes cells that don't have a flag in `flags`\"\n", " def __init__(self, flags, **kwargs):\n", " self.flags = flags\n", " super().__init__(**kwargs)\n", "\n", " def preprocess_cell(self, cell, resources, index):\n", " if 'source' not in cell or cell['cell_type'] != \"code\": return cell, resources\n", " for f in get_cell_flags(cell):\n", " if f not in self.flags: return cell, resources\n", " if check_re(cell, _re_notebook2script): return cell, resources\n", " return super().preprocess_cell(cell, resources, index)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#|export\n", "def test_nb(fn, flags=None):\n", " \"Execute tests in notebook in `fn` with `flags`\"\n", " os.environ[\"IN_TEST\"] = '1'\n", " if flags is None: flags = []\n", " try:\n", " nb = read_nb(fn)\n", " for f in get_all_flags(nb['cells']):\n", " if f not in flags: return\n", " ep = NoExportPreprocessor(flags, timeout=600, kernel_name='python3')\n", " pnb = nbformat.from_dict(nb)\n", " ep.preprocess(pnb, {})\n", " finally: os.environ.pop(\"IN_TEST\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test_nb('index.ipynb')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#|export\n", "def _test_one(fname, flags=None, verbose=True):\n", " print(f\"testing {fname}\")\n", " start = time.time()\n", " try:\n", " test_nb(fname, flags=flags)\n", " return True,time.time()-start\n", " except Exception as e:\n", " if \"ZMQError\" in str(e): _test_one(item, flags=flags, verbose=verbose)\n", " if verbose: print(f'Error in {fname}:\\n{e}')\n", " return False,time.time()-start" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#|export\n", "@call_parse\n", "def nbdev_test_nbs(\n", " fname:str=None, # A notebook name or glob to convert\n", " flags:str=None, # Space separated list of flags\n", " n_workers:int=None, # Number of workers to use\n", " verbose:bool_arg=True, # Print errors along the way\n", " timing:bool=False, # Timing each notebook to see the ones are slow\n", " pause:float=0.5 # Pause time (in secs) between notebooks to avoid race conditions\n", "):\n", " \"Test in parallel the notebooks matching `fname`, passing along `flags`\"\n", " if flags is not None: flags = flags.split(' ')\n", " files = nbglob(fname)\n", " files = [Path(f).absolute() for f in sorted(files)]\n", " assert len(files) > 0, \"No files to test found.\"\n", " if n_workers is None: n_workers = 0 if len(files)==1 else min(num_cpus(), 8)\n", " # make sure we are inside the notebook folder of the project\n", " os.chdir(get_config().path(\"nbs_path\"))\n", " results = parallel(_test_one, files, flags=flags, verbose=verbose, n_workers=n_workers, pause=pause)\n", " passed,times = [r[0] for r in results],[r[1] for r in results]\n", " if all(passed): print(\"All tests are passing!\")\n", " else:\n", " msg = \"The following notebooks failed:\\n\"\n", " raise Exception(msg + '\\n'.join([f.name for p,f in zip(passed,files) if not p]))\n", " if timing:\n", " for i,t in sorted(enumerate(times), key=lambda o:o[1], reverse=True):\n", " print(f\"Notebook {files[i].name} took {int(t)} seconds\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#|export\n", "@call_parse\n", "def nbdev_read_nbs(\n", " fname:str=None # A notebook name or glob to convert\n", "):\n", " \"Check all notebooks matching `fname` can be opened\"\n", " files = nbglob(fname)\n", " for nb in files:\n", " try: _ = read_nb(nb)\n", " except Exception as e:\n", " print(f\"{nb} is corrupted and can't be opened.\")\n", " raise e" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Export-" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Converted 00_export.ipynb.\n", "Converted 01_sync.ipynb.\n", "Converted 02_showdoc.ipynb.\n", "Converted 03_export2html.ipynb.\n", "Converted 04_test.ipynb.\n", "Converted 05_merge.ipynb.\n", "Converted 06_cli.ipynb.\n", "Converted 07_clean.ipynb.\n", "Converted 99_search.ipynb.\n", "Converted example.ipynb.\n", "Converted index.ipynb.\n", "Converted nbdev_comments.ipynb.\n", "Converted tutorial.ipynb.\n", "Converted tutorial_colab.ipynb.\n" ] } ], "source": [ "#|hide\n", "from nbdev.export import *\n", "notebook2script()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 2 }