In [1]:
 import ipylintotype, IPython, traitlets, abc, ipykernel, mypy.api, io, contextlib, re, pylint.lint
 from tempfile import TemporaryDirectory
 from pathlib import Path
 get_ipython = IPython.get_ipython


In [2]:
 class Annotator(traitlets.HasTraits):
 mimetype = traitlets.Unicode()
 entry_point = traitlets.Unicode()
 def __call__(Annotator, code, all_code, metadata, *args, **kwargs):
 return Annotator.run(
 code, all_code.get(Annotator.mimetype, []), metadata.get(
 Annotator.mimetype, {}
 ).get(Annotator.entry_point), *args, **kwargs)
 @traitlets.default('entry_point')
 def _default_entry_point(Annotator): return Annotator.__name__
 @abc.abstractmethod
 def run(Annotator):
 raise NotImplementedError()

In [3]:
 class IPythonAnnotator(Annotator):
 mimetype = traitlets.Unicode(default_value='text/x-ipython')

In [4]:
 _re_mypy = (r"(?P.*):(?P\d+):(?P\d+):\s*(?P.*)\s*:(?P.*)")

 class MyPy(IPythonAnnotator):
 args = traitlets.Unicode("--show-column-numbers --show-error-context --follow-imports silent")
 entry_point = traitlets.Unicode(default_value=mypy.__name__)
 def run(MyPy, code, all_code=None, metadata=None, *args):
 args = list(args) + MyPy.args.split() + ["-c", code] 
 out, err, count = mypy.api.run(args)
 lines = [line for line in out.strip().split("\n")]
 matches = [re.match(_re_mypy, line) for line in out.strip().split("\n")]
 matches = [match.groupdict() for match in matches if match]
 return [
 {
 "message": f"""{err["message"]} [mypy]""",
 "severity": err["severity"],
 "from": dict(line=int(err["line"]) - 1, col=int(err["col"]) - 1),
 "to": dict(line=int(err["line"]) - 1, col=int(err["col"])),
 }
 for err in matches
 ]


In [5]:
 class PyLint(IPythonAnnotator):
 entry_point = traitlets.Unicode(default_value=pylint.__name__)
 def run(PyLint, code, all_code=None, metadata=None, *args):
 with TemporaryDirectory() as td:
 tdp = Path(td)
 fn = tdp / "foo.py"
 fn.write_text(code)
 s = io.StringIO()
 with contextlib.redirect_stdout(s), contextlib.redirect_stderr(io.StringIO()):
 try:
 res = pylint.lint.Run([str(fn)])
 except:
 pass
 matches = re.findall(
 r'^(.):\s*(\d+),\s(\d+):\s*(.*?)\s*\((.*)\)$', 
 s.getvalue(),
 flags=re.M
 )


 return [
 {
 "message": msg,
 "severity": {
 "W": "warning",
 #"C": "convention"
 }.get(severity, "error"),
 "from": dict(line=int(line) - 1, col=int(col)),
 "to": dict(line=int(line) - 1, col=int(col) + 1),
 }
 for severity, line, col, msg, rule in matches
 ]

In [6]:
 class AnnotationFormatter(IPython.core.interactiveshell.InteractiveShell):
 formatters = traitlets.List()
 
 comm_name = traitlets.Unicode(default_value=ipylintotype._version.__comm__)
 current_comm = traitlets.Any()
 def init_user_ns(AnnotationFormatter): ...
 def __init__(AnnotationFormatter, *args, **kwargs):
 if 'parent' in kwargs: kwargs = {**kwargs['parent']._trait_values, **kwargs}
 super().__init__(*args, **kwargs)
 AnnotationFormatter.init_comm()
 
 def init_comm(AnnotationFormatter):
 AnnotationFormatter.close()
 AnnotationFormatter.current_comm = ipykernel.comm.Comm(target_name=AnnotationFormatter.comm_name)
 AnnotationFormatter.current_comm.on_msg(AnnotationFormatter.on_msg)
 __import__('atexit').register(AnnotationFormatter.close)

# def __call__(AnnotationFormatter, code, *args, all_code=None, metadata=None):
# result = dict()
# for formatter in AnnotationFormatter.formatters:
# if formatter.mimetype in all_code:
# result.setdefault(formatter.mimetype, list).extend(
# formatter(code, *args, all_code=all_code, metadata=metadata))
# return result

 def __call__(AnnotationFormatter, code, all_code=None, metadata=None, *args):
 result = []
 code = AnnotationFormatter.transform_cell(code)
 for formatter in AnnotationFormatter.formatters:
 AnnotationFormatter.log.warn("CALL %s", args)
 result.extend(formatter(code, all_code=all_code, metadata=metadata, *args))
 return result

 def close(AnnotationFormatter): 
 if AnnotationFormatter.current_comm: AnnotationFormatter.current_comm.close()
 AnnotationFormatter.current_comm = None

 def on_msg(AnnotationFormatter, msg):
 AnnotationFormatter.log.warn("MSG %s", msg)
 data = msg["content"]["data"]
 annotations = AnnotationFormatter(data.get("code", ""), all_code={}, metadata={})
 AnnotationFormatter.log.warn("ANNO %s", annotations)
 AnnotationFormatter.current_comm.send(dict(id=data["id"], annotations=annotations))

In [7]:
 FORMATTER = None
 def load_ipython_extension(shell):
 global FORMATTER
 unload_ipython_extension(shell)
 FORMATTER = AnnotationFormatter(formatters=[MyPy(), PyLint()], parent=get_ipython()) 
 def unload_ipython_extension(shell):
 global FORMATTER
 if FORMATTER: FORMATTER.close()
 if __name__ == '__main__':
 load_ipython_extension(get_ipython())