In [None]:
#|hide
#|default_exp process

# process
> A notebook processor

In [None]:
#|export
from nbdev.config import *
from nbdev.maker import *
from nbdev.imports import *

from execnb.nbio import *
from fastcore.script import *
from fastcore.imports import *

from collections import defaultdict

In [None]:
#|hide
from fastcore.test import *
from pdb import set_trace
from importlib import reload
from fastcore import shutil

Special comments at the start of a cell can be used to provide information to `nbdev` about how to process a cell, so we need to be able to find the location of these comments.

In [None]:
minimal = read_nb('..//tests/minimal.ipynb')

In [None]:
#|export
# from https://github.com/quarto-dev/quarto-cli/blob/main/src/resources/jupyter/notebook.py
langs = defaultdict(
 lambda: '#', r = "#", python = "#", julia = "#", scala = "//", matlab = "%", csharp = "//", fsharp = "//",
 c = ["/*","*/"], css = ["/*","*/"], sas = ["*",";"], powershell = "#", bash = "#", sql = "--", mysql = "--", psql = "--",
 lua = "--", cpp = "//", cc = "//", stan = "#", octave = "#", fortran = "!", fortran95 = "!", awk = "#", gawk = "#", stata = "*",
 java = "//", groovy = "//", sed = "#", perl = "#", ruby = "#", tikz = "%", javascript = "//", js = "//", d3 = "//", node = "//",
 sass = "//", coffee = "#", go = "//", asy = "//", haskell = "--", dot = "//", apl = "⍝")

In [None]:
#|export
def nb_lang(nb): return nested_attr(nb, 'metadata.kernelspec.language', 'python')

In [None]:
#|hide
test_eq(nb_lang(read_nb('..//tests/minimal.ipynb')), 'python')
test_eq(nb_lang(read_nb('..//tests/APL.ipynb')), 'apl')

In [None]:
#|export

def _dir_pre(lang=None): return fr"\s*{langs[lang]}\s*\|"
def _quarto_re(lang=None): return re.compile(_dir_pre(lang) + r'\s*[\w|-]+\s*:')

In [None]:
#|hide
assert _quarto_re().match('#|code-fold: show')
assert _quarto_re().match('#|hide: true')
assert not _quarto_re().match('#|code fold: show') #not a valid quarto directive

In [None]:
#|export
def _directive(s, lang='python'):
 s = re.sub('^'+_dir_pre(lang), f"{langs[lang]}|", s)
 if ':' in s: s = s.replace(':', ': ')
 s = (s.strip()[2:]).strip().split()
 if not s: return None
 direc,*args = s
 return direc,args

In [None]:
#|export
def _norm_quarto(s, lang='python'):
 "normalize quarto directives so they have a space after the colon"
 m = _quarto_re(lang).match(s)
 return m.group(0) + ' ' + _quarto_re(lang).sub('', s).lstrip() if m else s

In [None]:
#|hide
test_eq(_norm_quarto('#|foo:bar'), '#|foo: bar')
test_eq(_norm_quarto('#|foo: bar'), '#|foo: bar')
test_eq(_norm_quarto('#|not_quarto'), '#|not_quarto')

In [None]:
#|export
_cell_mgc = re.compile(r"^\s*%%\w+")

def first_code_ln(code_list, re_pattern=None, lang='python'):
 "get first line number where code occurs, where `code_list` is a list of code"
 if re_pattern is None: re_pattern = _dir_pre(lang)
 return first(i for i,o in enumerate(code_list) if o.strip() != '' and not re.match(re_pattern, o) and not _cell_mgc.match(o))

In [None]:
_tst = """ 
#|default_exp
 #|export
#|hide_input
foo
"""
test_eq(first_code_ln(_tst.splitlines(True)), 4)

In [None]:
#|hide

# test for cell magics
_tst = """%%timeit
#|hide
 #|export
foo
"""
test_eq(first_code_ln(_tst.splitlines(True)), 3)

# test when there is line magic
_tst = """
#|hide
%line_magic
 #|export
foo
"""
test_eq(first_code_ln(_tst.splitlines(True)),2)

In [None]:
#|export
def extract_directives(cell, remove=True, lang='python'):
 "Take leading comment directives from lines of code in `ss`, remove `#|`, and split"
 if cell.source:
 ss = cell.source.splitlines(True)
 first_code = first_code_ln(ss, lang=lang)
 if not ss or first_code==0: return {}
 pre = ss[:first_code]
 if remove:
 # Leave Quarto directives and cell magic in place for later processing
 cell['source'] = ''.join([_norm_quarto(o, lang) for o in pre if _quarto_re(lang).match(o) or _cell_mgc.match(o)] + ss[first_code:])
 return dict(L(_directive(s, lang) for s in pre).filter())

Comment directives start with `#|`, followed by whitespace delimited tokens, which `extract_directives` extracts from the start of a cell, up until a blank line or a line containing something other than comments. The extracted lines are removed from the source.

In [None]:
exp = AttrDict(source = """#|export module
#|eval:false
#| hide
# | foo bar
# |woo: baz
1+2
#bar""")
test_eq(extract_directives(exp), {'export':['module'], 'hide':[], 'eval:': ['false'], 'foo': ['bar'], 'woo:': ['baz']})
test_eq(exp.source, '#|eval: false\n# |woo: baz\n1+2\n#bar')

In [None]:
#|hide
exp = AttrDict(source = """
⍝|hide
⍝| foo: bar
# |woo: baz
1+2
⍝bar""")
test_eq(extract_directives(exp, lang='apl'), {'hide': [], 'foo:': ['bar']})

In [None]:
#|export
def opt_set(var, newval):
 "newval if newval else var"
 return newval if newval else var

In [None]:
#|export
def instantiate(x, **kwargs):
 "Instantiate `x` if it's a type"
 return x(**kwargs) if isinstance(x,type) else x

def _mk_procs(procs, nb): return L(procs).map(instantiate, nb=nb)

In [None]:
#|export
def _is_direc(f): return getattr(f, '__name__', '-')[-1]=='_'

In [None]:
#|export
class NBProcessor:
 "Process cells and nbdev comments in a notebook"
 def __init__(self, path=None, procs=None, nb=None, debug=False, rm_directives=True, process=False):
 self.nb = read_nb(path) if nb is None else nb
 self.lang = nb_lang(self.nb)
 for cell in self.nb.cells: cell.directives_ = extract_directives(cell, remove=rm_directives, lang=self.lang)
 self.procs = _mk_procs(procs, nb=self.nb)
 self.debug,self.rm_directives = debug,rm_directives
 if process: self.process()

 def _process_cell(self, proc, cell):
 if not hasattr(cell,'source'): return
 if cell.cell_type=='code' and cell.directives_:
 # Option 1: `proc` is directive name with `_` suffix
 f = getattr(proc, '__name__', '-').rstrip('_')
 if f in cell.directives_: self._process_comment(proc, cell, f)
 
 # Option 2: `proc` contains a method named `_{directive}_`
 for cmd in cell.directives_:
 f = getattr(proc, f'_{cmd}_', None)
 if f: self._process_comment(f, cell, cmd)
 if callable(proc) and not _is_direc(proc): cell = opt_set(cell, proc(cell))

 def _process_comment(self, proc, cell, cmd):
 args = cell.directives_[cmd]
 if self.debug: print(cmd, args, f)
 return proc(cell, *args)
 
 def _proc(self, proc):
 if hasattr(proc,'begin'): proc.begin()
 for cell in self.nb.cells: self._process_cell(proc, cell)
 if hasattr(proc,'end'): proc.end()
 self.nb.cells = [c for c in self.nb.cells if c and getattr(c,'source',None) is not None]
 for i,cell in enumerate(self.nb.cells): cell.idx_ = i

 def process(self):
 "Process all cells with all processors"
 for proc in self.procs: self._proc(proc)

Cell processors can be callables (e.g regular functions), in which case they are called for every cell (set a cell's source to `None` to remove the cell):

In [None]:
everything_fn = '..//tests/01_everything.ipynb'

def print_execs(cell):
 if 'exec' in cell.source: print(cell.source)

NBProcessor(everything_fn, print_execs).process()

---
title: Foo
execute:
 echo: false
---
exec("o_y=1")
exec("p_y=1")
_all_ = [o_y, 'p_y']


Comment directives are put in a cell attribute `directive_` as a dictionary keyed by directive name:

In [None]:
def printme_func(cell):
 if cell.directives_ and 'printme' in cell.directives_: print(cell.directives_['printme'])

NBProcessor(everything_fn, printme_func).process()

['testing']


However, a more convenient way to handle comment directives is to use a *class* as a processor, and include a method in your class with the same name as your directive, surrounded by underscores:

In [None]:
class _PrintExample:
 def _printme_(self, cell, to_print): print(to_print)

NBProcessor(everything_fn, _PrintExample()).process()

testing


In the case that your processor supports just one comment directive, you can just use a regular function, with the same name as your directive, but with an underscore appended -- here `printme_` is identical to `_PrintExample` above:

In [None]:
def printme_(cell, to_print): print(to_print)

NBProcessor(everything_fn, printme_).process()

testing


In [None]:
NBProcessor(everything_fn, _PrintExample()).process()

testing


In [None]:
#|export
class Processor:
 "Base class for processors"
 def __init__(self, nb): self.nb = nb
 def cell(self, cell): pass
 def __call__(self, cell): return self.cell(cell)

For more complex behavior, inherit from `Processor`, and override one of more of `begin()` (called before any cells are processed), `cell()` (called for each cell), and `end()` (called after all cells are processed). You can also include comment directives (such as the `_printme` example above) in these subclasses. Subclasses will automatically have access to `self.nb`, containing the processed notebook.

In [None]:
class CountCellProcessor(Processor):
 def begin(self):
 print(f"First cell:\n{self.nb.cells[0].source}")
 self.count=0
 def cell(self, cell):
 if cell.cell_type=='code': self.count += 1
 def end(self): print(f"* There were {self.count} code cells")

In [None]:
NBProcessor(everything_fn, CountCellProcessor).process()

First cell:
---
title: Foo
execute:
 echo: false
---
* There were 26 code cells


## Export -

In [None]:
#|hide
from nbdev.maker import _basic_export_nb2

In [None]:
#|eval: false
#|hide
basic_export_nb2('01_read.ipynb', 'read')
basic_export_nb2('02_maker.ipynb', 'maker')
basic_export_nb2('03_process.ipynb', 'process')

g = exec_new('import nbdev.process')
assert hasattr(g['nbdev'].process, 'NBProcessor')