{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "bce55715", "metadata": {}, "outputs": [], "source": [ "#|hide\n", "#|default_exp process" ] }, { "cell_type": "markdown", "id": "f6b8b4a6", "metadata": {}, "source": [ "# process\n", "> A notebook processor" ] }, { "cell_type": "code", "execution_count": null, "id": "59172c3d", "metadata": {}, "outputs": [], "source": [ "#|export\n", "from nbdev.config import *\n", "from nbdev.maker import *\n", "from nbdev.imports import *\n", "\n", "from execnb.nbio import *\n", "from fastcore.script import *\n", "from fastcore.imports import *\n", "\n", "from collections import defaultdict" ] }, { "cell_type": "code", "execution_count": null, "id": "ce00cb74", "metadata": {}, "outputs": [], "source": [ "#|hide\n", "from fastcore.test import *\n", "from pdb import set_trace\n", "from importlib import reload\n", "from fastcore import shutil" ] }, { "cell_type": "markdown", "id": "cbe57d96", "metadata": {}, "source": [ "Special comments at the start of a cell can be used to provide information to `nbdev` about how to process a cell, so we need to be able to find the location of these comments." ] }, { "cell_type": "code", "execution_count": null, "id": "48fdaa8d", "metadata": {}, "outputs": [], "source": [ "minimal = read_nb('..//tests/minimal.ipynb')" ] }, { "cell_type": "code", "execution_count": null, "id": "48e04902-f4a1-4247-ae0c-1e30bc054166", "metadata": {}, "outputs": [], "source": [ "#|export\n", "# from https://github.com/quarto-dev/quarto-cli/blob/main/src/resources/jupyter/notebook.py\n", "langs = defaultdict(\n", " lambda: '#', r = \"#\", python = \"#\", julia = \"#\", scala = \"//\", matlab = \"%\", csharp = \"//\", fsharp = \"//\",\n", " c = [\"/*\",\"*/\"], css = [\"/*\",\"*/\"], sas = [\"*\",\";\"], powershell = \"#\", bash = \"#\", sql = \"--\", mysql = \"--\", psql = \"--\",\n", " lua = \"--\", cpp = \"//\", cc = \"//\", stan = \"#\", octave = \"#\", fortran = \"!\", fortran95 = \"!\", awk = \"#\", gawk = \"#\", stata = \"*\",\n", " java = \"//\", groovy = \"//\", sed = \"#\", perl = \"#\", ruby = \"#\", tikz = \"%\", javascript = \"//\", js = \"//\", d3 = \"//\", node = \"//\",\n", " sass = \"//\", coffee = \"#\", go = \"//\", asy = \"//\", haskell = \"--\", dot = \"//\", apl = \"⍝\")" ] }, { "cell_type": "code", "execution_count": null, "id": "c54390ab-1cb9-4146-9e88-609f1fc5544a", "metadata": {}, "outputs": [], "source": [ "#|export\n", "def nb_lang(nb): return nested_attr(nb, 'metadata.kernelspec.language', 'python')" ] }, { "cell_type": "code", "execution_count": null, "id": "92e96f7d-a7b7-4915-a660-d659520081d5", "metadata": {}, "outputs": [], "source": [ "#|hide\n", "test_eq(nb_lang(read_nb('..//tests/minimal.ipynb')), 'python')\n", "test_eq(nb_lang(read_nb('..//tests/APL.ipynb')), 'apl')" ] }, { "cell_type": "code", "execution_count": null, "id": "2974d8cc-870c-4362-b2b1-e35f94c28dd9", "metadata": {}, "outputs": [], "source": [ "#|export\n", "\n", "def _dir_pre(lang=None): return fr\"\\s*{langs[lang]}\\s*\\|\"\n", "def _quarto_re(lang=None): return re.compile(_dir_pre(lang) + r'\\s*[\\w|-]+\\s*:')" ] }, { "cell_type": "code", "execution_count": null, "id": "deedd077-9503-44af-8197-5558f46a3255", "metadata": {}, "outputs": [], "source": [ "#|hide\n", "assert _quarto_re().match('#|code-fold: show')\n", "assert _quarto_re().match('#|hide: true')\n", "assert not _quarto_re().match('#|code fold: show') #not a valid quarto directive" ] }, { "cell_type": "code", "execution_count": null, "id": "484a1df8", "metadata": {}, "outputs": [], "source": [ "#|export\n", "def _directive(s, lang='python'):\n", " s = re.sub('^'+_dir_pre(lang), f\"{langs[lang]}|\", s)\n", " if ':' in s: s = s.replace(':', ': ')\n", " s = (s.strip()[2:]).strip().split()\n", " if not s: return None\n", " direc,*args = s\n", " return direc,args" ] }, { "cell_type": "code", "execution_count": null, "id": "100a91fb-8b46-4feb-afbf-d1e78c14cfd6", "metadata": {}, "outputs": [], "source": [ "#|export\n", "def _norm_quarto(s, lang='python'):\n", " \"normalize quarto directives so they have a space after the colon\"\n", " m = _quarto_re(lang).match(s)\n", " return m.group(0) + ' ' + _quarto_re(lang).sub('', s).lstrip() if m else s" ] }, { "cell_type": "code", "execution_count": null, "id": "b496a9b3-b90b-4831-9454-4c4c3b8e89f1", "metadata": {}, "outputs": [], "source": [ "#|hide\n", "test_eq(_norm_quarto('#|foo:bar'), '#|foo: bar')\n", "test_eq(_norm_quarto('#|foo: bar'), '#|foo: bar')\n", "test_eq(_norm_quarto('#|not_quarto'), '#|not_quarto')" ] }, { "cell_type": "code", "execution_count": null, "id": "69befb76-aa77-4923-bfcc-2606613d706f", "metadata": {}, "outputs": [], "source": [ "#|export\n", "_cell_mgc = re.compile(r\"^\\s*%%\\w+\")\n", "\n", "def first_code_ln(code_list, re_pattern=None, lang='python'):\n", " \"get first line number where code occurs, where `code_list` is a list of code\"\n", " if re_pattern is None: re_pattern = _dir_pre(lang)\n", " return first(i for i,o in enumerate(code_list) if o.strip() != '' and not re.match(re_pattern, o) and not _cell_mgc.match(o))" ] }, { "cell_type": "code", "execution_count": null, "id": "86547300-342c-4b8e-917d-f09c7cd1e01f", "metadata": {}, "outputs": [], "source": [ "_tst = \"\"\" \n", "#|default_exp\n", " #|export\n", "#|hide_input\n", "foo\n", "\"\"\"\n", "test_eq(first_code_ln(_tst.splitlines(True)), 4)" ] }, { "cell_type": "code", "execution_count": null, "id": "c73ab85a-e4d7-43cc-9eb3-995e740fa532", "metadata": {}, "outputs": [], "source": [ "#|hide\n", "\n", "# test for cell magics\n", "_tst = \"\"\"%%timeit\n", "#|hide\n", " #|export\n", "foo\n", "\"\"\"\n", "test_eq(first_code_ln(_tst.splitlines(True)), 3)\n", "\n", "# test when there is line magic\n", "_tst = \"\"\"\n", "#|hide\n", "%line_magic\n", " #|export\n", "foo\n", "\"\"\"\n", "test_eq(first_code_ln(_tst.splitlines(True)),2)" ] }, { "cell_type": "code", "execution_count": null, "id": "3cca78a4", "metadata": {}, "outputs": [], "source": [ "#|export\n", "def extract_directives(cell, remove=True, lang='python'):\n", " \"Take leading comment directives from lines of code in `ss`, remove `#|`, and split\"\n", " if cell.source:\n", " ss = cell.source.splitlines(True)\n", " first_code = first_code_ln(ss, lang=lang)\n", " if not ss or first_code==0: return {}\n", " pre = ss[:first_code]\n", " if remove:\n", " # Leave Quarto directives and cell magic in place for later processing\n", " cell['source'] = ''.join([_norm_quarto(o, lang) for o in pre if _quarto_re(lang).match(o) or _cell_mgc.match(o)] + ss[first_code:])\n", " return dict(L(_directive(s, lang) for s in pre).filter())" ] }, { "cell_type": "markdown", "id": "d58cda54", "metadata": {}, "source": [ "Comment directives start with `#|`, followed by whitespace delimited tokens, which `extract_directives` extracts from the start of a cell, up until a blank line or a line containing something other than comments. The extracted lines are removed from the source." ] }, { "cell_type": "code", "execution_count": null, "id": "9ec3d86d", "metadata": {}, "outputs": [], "source": [ "exp = AttrDict(source = \"\"\"#|export module\n", "#|eval:false\n", "#| hide\n", "# | foo bar\n", "# |woo: baz\n", "1+2\n", "#bar\"\"\")\n", "test_eq(extract_directives(exp), {'export':['module'], 'hide':[], 'eval:': ['false'], 'foo': ['bar'], 'woo:': ['baz']})\n", "test_eq(exp.source, '#|eval: false\\n# |woo: baz\\n1+2\\n#bar')" ] }, { "cell_type": "code", "execution_count": null, "id": "e03d189c-0629-487c-8cc5-e34268aab5f9", "metadata": {}, "outputs": [], "source": [ "#|hide\n", "exp = AttrDict(source = \"\"\"\n", "⍝|hide\n", "⍝| foo: bar\n", "# |woo: baz\n", "1+2\n", "⍝bar\"\"\")\n", "test_eq(extract_directives(exp, lang='apl'), {'hide': [], 'foo:': ['bar']})" ] }, { "cell_type": "code", "execution_count": null, "id": "e6701805", "metadata": {}, "outputs": [], "source": [ "#|export\n", "def opt_set(var, newval):\n", " \"newval if newval else var\"\n", " return newval if newval else var" ] }, { "cell_type": "code", "execution_count": null, "id": "98c9d556", "metadata": {}, "outputs": [], "source": [ "#|export\n", "def instantiate(x, **kwargs):\n", " \"Instantiate `x` if it's a type\"\n", " return x(**kwargs) if isinstance(x,type) else x\n", "\n", "def _mk_procs(procs, nb): return L(procs).map(instantiate, nb=nb)" ] }, { "cell_type": "code", "execution_count": null, "id": "ab147efe", "metadata": {}, "outputs": [], "source": [ "#|export\n", "def _is_direc(f): return getattr(f, '__name__', '-')[-1]=='_'" ] }, { "cell_type": "code", "execution_count": null, "id": "7c81f109", "metadata": {}, "outputs": [], "source": [ "#|export\n", "class NBProcessor:\n", " \"Process cells and nbdev comments in a notebook\"\n", " def __init__(self, path=None, procs=None, nb=None, debug=False, rm_directives=True, process=False):\n", " self.nb = read_nb(path) if nb is None else nb\n", " self.lang = nb_lang(self.nb)\n", " for cell in self.nb.cells: cell.directives_ = extract_directives(cell, remove=rm_directives, lang=self.lang)\n", " self.procs = _mk_procs(procs, nb=self.nb)\n", " self.debug,self.rm_directives = debug,rm_directives\n", " if process: self.process()\n", "\n", " def _process_cell(self, proc, cell):\n", " if not hasattr(cell,'source'): return\n", " if cell.cell_type=='code' and cell.directives_:\n", " # Option 1: `proc` is directive name with `_` suffix\n", " f = getattr(proc, '__name__', '-').rstrip('_')\n", " if f in cell.directives_: self._process_comment(proc, cell, f)\n", " \n", " # Option 2: `proc` contains a method named `_{directive}_`\n", " for cmd in cell.directives_:\n", " f = getattr(proc, f'_{cmd}_', None)\n", " if f: self._process_comment(f, cell, cmd)\n", " if callable(proc) and not _is_direc(proc): cell = opt_set(cell, proc(cell))\n", "\n", " def _process_comment(self, proc, cell, cmd):\n", " args = cell.directives_[cmd]\n", " if self.debug: print(cmd, args, f)\n", " return proc(cell, *args)\n", " \n", " def _proc(self, proc):\n", " if hasattr(proc,'begin'): proc.begin()\n", " for cell in self.nb.cells: self._process_cell(proc, cell)\n", " if hasattr(proc,'end'): proc.end()\n", " self.nb.cells = [c for c in self.nb.cells if c and getattr(c,'source',None) is not None]\n", " for i,cell in enumerate(self.nb.cells): cell.idx_ = i\n", "\n", " def process(self):\n", " \"Process all cells with all processors\"\n", " for proc in self.procs: self._proc(proc)" ] }, { "cell_type": "markdown", "id": "85874d36", "metadata": {}, "source": [ "Cell processors can be callables (e.g regular functions), in which case they are called for every cell (set a cell's source to `None` to remove the cell):" ] }, { "cell_type": "code", "execution_count": null, "id": "babdd6d0", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "---\n", "title: Foo\n", "execute:\n", " echo: false\n", "---\n", "exec(\"o_y=1\")\n", "exec(\"p_y=1\")\n", "_all_ = [o_y, 'p_y']\n" ] } ], "source": [ "everything_fn = '..//tests/01_everything.ipynb'\n", "\n", "def print_execs(cell):\n", " if 'exec' in cell.source: print(cell.source)\n", "\n", "NBProcessor(everything_fn, print_execs).process()" ] }, { "cell_type": "markdown", "id": "a8202589", "metadata": {}, "source": [ "Comment directives are put in a cell attribute `directive_` as a dictionary keyed by directive name:" ] }, { "cell_type": "code", "execution_count": null, "id": "6da4a177", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "['testing']\n" ] } ], "source": [ "def printme_func(cell):\n", " if cell.directives_ and 'printme' in cell.directives_: print(cell.directives_['printme'])\n", "\n", "NBProcessor(everything_fn, printme_func).process()" ] }, { "cell_type": "markdown", "id": "315106a7", "metadata": {}, "source": [ "However, a more convenient way to handle comment directives is to use a *class* as a processor, and include a method in your class with the same name as your directive, surrounded by underscores:" ] }, { "cell_type": "code", "execution_count": null, "id": "060f85c1", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "testing\n" ] } ], "source": [ "class _PrintExample:\n", " def _printme_(self, cell, to_print): print(to_print)\n", "\n", "NBProcessor(everything_fn, _PrintExample()).process()" ] }, { "cell_type": "markdown", "id": "15c01642", "metadata": {}, "source": [ "In the case that your processor supports just one comment directive, you can just use a regular function, with the same name as your directive, but with an underscore appended -- here `printme_` is identical to `_PrintExample` above:" ] }, { "cell_type": "code", "execution_count": null, "id": "a9396951", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "testing\n" ] } ], "source": [ "def printme_(cell, to_print): print(to_print)\n", "\n", "NBProcessor(everything_fn, printme_).process()" ] }, { "cell_type": "code", "execution_count": null, "id": "0cad193c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "testing\n" ] } ], "source": [ "NBProcessor(everything_fn, _PrintExample()).process()" ] }, { "cell_type": "code", "execution_count": null, "id": "fa1f8668", "metadata": {}, "outputs": [], "source": [ "#|export\n", "class Processor:\n", " \"Base class for processors\"\n", " def __init__(self, nb): self.nb = nb\n", " def cell(self, cell): pass\n", " def __call__(self, cell): return self.cell(cell)" ] }, { "cell_type": "markdown", "id": "2fb60713", "metadata": {}, "source": [ "For more complex behavior, inherit from `Processor`, and override one of more of `begin()` (called before any cells are processed), `cell()` (called for each cell), and `end()` (called after all cells are processed). You can also include comment directives (such as the `_printme` example above) in these subclasses. Subclasses will automatically have access to `self.nb`, containing the processed notebook." ] }, { "cell_type": "code", "execution_count": null, "id": "4484dfd3", "metadata": {}, "outputs": [], "source": [ "class CountCellProcessor(Processor):\n", " def begin(self):\n", " print(f\"First cell:\\n{self.nb.cells[0].source}\")\n", " self.count=0\n", " def cell(self, cell):\n", " if cell.cell_type=='code': self.count += 1\n", " def end(self): print(f\"* There were {self.count} code cells\")" ] }, { "cell_type": "code", "execution_count": null, "id": "115407c3", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "First cell:\n", "---\n", "title: Foo\n", "execute:\n", " echo: false\n", "---\n", "* There were 26 code cells\n" ] } ], "source": [ "NBProcessor(everything_fn, CountCellProcessor).process()" ] }, { "cell_type": "markdown", "id": "e95db40d", "metadata": {}, "source": [ "## Export -" ] }, { "cell_type": "code", "execution_count": null, "id": "f0d52262", "metadata": {}, "outputs": [], "source": [ "#|hide\n", "from nbdev.maker import _basic_export_nb2" ] }, { "cell_type": "code", "execution_count": null, "id": "4e85652c", "metadata": {}, "outputs": [], "source": [ "#|eval: false\n", "#|hide\n", "basic_export_nb2('01_read.ipynb', 'read')\n", "basic_export_nb2('02_maker.ipynb', 'maker')\n", "basic_export_nb2('03_process.ipynb', 'process')\n", "\n", "g = exec_new('import nbdev.process')\n", "assert hasattr(g['nbdev'].process, 'NBProcessor')" ] }, { "cell_type": "code", "execution_count": null, "id": "af6db102-2447-49ba-94d9-bfebfb48c0f0", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }