#!/usr/bin/env python3 """neorev — human review for agent diffs.""" from __future__ import annotations import argparse import base64 import binascii import enum import os import re import select import signal import subprocess import sys import tempfile import termios import tty from collections import Counter from contextlib import contextmanager from dataclasses import dataclass, field from io import StringIO from pathlib import Path from typing import TYPE_CHECKING if TYPE_CHECKING: from collections.abc import Callable, Iterator from typing import IO, Self CSI = "\033[" BOLD = f"{CSI}1m" DIM = f"{CSI}2m" RESET = f"{CSI}0m" GREEN = f"{CSI}32m" RED = f"{CSI}31m" YELLOW = f"{CSI}33m" CYAN = f"{CSI}36m" BG_BLUE = f"{CSI}44m" BG_CHROME = f"{CSI}40m" BLACK = f"{CSI}30m" WHITE = f"{CSI}37m" BG_CYAN = f"{CSI}46m" BG_YELLOW_BLACK = f"{CSI}43;30m" REVERSE = f"{CSI}7m" ANSI_ESCAPE_TEXT_RE = re.compile(r"\x1b\[[0-9;]*[a-zA-Z]") HUNK_BODY_MAX_LINES = 10 HUNK_BODY_CONTEXT_LINES = HUNK_BODY_MAX_LINES // 2 BITS_PER_BYTE = 8 DRAIN_READ_SIZE = 256 ESC_SEQUENCE_TIMEOUT = 0.05 CHROME_ROWS = 3 # top bar + markers + footer MIN_VISIBLE_ROWS = 3 MARKER_WIDTH = 3 # each marker is 3 visible chars (e.g. "[✓]" or " · ") ARROW_WIDTH = 2 # "◀ " or " ▶" SCROLL_INDICATOR_ROWS = 2 # up + down indicators MIN_TERMINAL_WIDTH = 20 MIN_TERMINAL_HEIGHT = CHROME_ROWS + MIN_VISIBLE_ROWS TRUNCATION_ELLIPSIS = "…" RESIZE_DEBOUNCE_TIMEOUT = 0.05 SELECT_IMMEDIATE = 0 GLOBAL_TARGET = "global" LINE_PICKER_SELECT_KEY = "\r" EDITOR_CONTEXT_RADIUS = 3 EDITOR_HUNK_CONTEXT_MAX = 15 EDITOR_TARGET_MARKER = "►" EDITOR_CONTEXT_PAD = " " NOTE_TARGET_HUNK = "hunk" NOTE_PANEL_CURSOR = "▶" NOTE_PANEL_CHROME_ROWS = 2 # header + footer NOTE_PANEL_MIN_HEIGHT = 3 # chrome + at least 1 note row NOTE_PANEL_DIFF_CHROME_ROWS = 2 # top bar + progress markers (no footer) NOTE_PANEL_MAX_FRACTION = 3 # panel takes at most 1/N of terminal height ALL_CLEAR_MESSAGE = "All chunks approved, nothing to report.\n" PREVIEW_INTERPUNCT = " · " XDG_STATE_HOME_DEFAULT = ".local/state" NEOREV_STATE_DIR = "neorev" DEFAULT_OUTPUT_STEM = "review" DEFAULT_OUTPUT_EXT = ".md" DEFAULT_OUTPUT_SEPARATOR = "-" JJ_DEFAULT_REV = "@" GIT_DEFAULT_REV = "HEAD" JJ_DEFAULT_WORKSPACE = "default" QUESTION_ICON = "󰠗" FLAG_ICON = "󰅽" MARGIN_WIDTH = 2 # margin column chars (marker + space) MARGIN_EMPTY = b" " ANSI_ESCAPE_BYTES_RE = re.compile(rb"\x1b\[[0-9;]*[a-zA-Z]") HUNK_RANGE_LINE_RE = re.compile( r"^@@ -(?P\d+)(?:,\d+)? \+(?P\d+)(?:,\d+)? @@" ) SECTION_HEADER_RE = re.compile( r"\[(CHANGE REQUESTED|QUESTION)\]\s+`(.+?)(?:\s+@\s+(.+?))?`$", re.MULTILINE, ) FOOTER_RE = re.compile( r"^$", re.MULTILINE, ) LINE_PICKER_FOOTER_SEGMENTS = [ ("j/k", "move"), ("Enter", "select line"), ("h", "whole hunk"), ("Esc", "cancel"), ] OUTPUT_PREAMBLE = ( "# Code Review\n\n" "## Instructions\n\n" "- `CHANGE REQUESTED` → apply the requested change.\n" "- `QUESTION` → answer the question; do NOT change code" " unless your answer warrants it.\n" "- `global` notes do not apply to a specific part of the diff." " They may refer to the change as a whole, or to something" " outside of it.\n" "- A `diff` block is review context, not an appliable patch.\n" "- `# ...` inside a `diff` block means lines from the same" " hunk were omitted.\n" ) class NoDiffOnStdinError(Exception): """Raised when stdin is a TTY and no diff data is available.""" class Vcs(enum.StrEnum): """Supported version control backends.""" JUJUTSU = "jj" GIT = "git" class Status(enum.Enum): """Enumerate high-level hunk review statuses.""" APPROVED = enum.auto() FLAG = enum.auto() QUESTION = enum.auto() def __str__(self) -> str: """Return the ANSI-styled status label for UI chrome.""" if self is Status.APPROVED: return f"{GREEN}✓ approved{RESET}" if self is Status.FLAG: return f"{RED}✗ change requested{RESET}" return f"{YELLOW}? question{RESET}" class NoteKind(enum.Enum): """Enumerate note categories used in reviews — each value is its output label.""" FLAG = "CHANGE REQUESTED" QUESTION = "QUESTION" @property def icon(self) -> str: """Return the Nerd Font icon for this note kind.""" if self is NoteKind.FLAG: return FLAG_ICON return QUESTION_ICON @property def editor_label(self) -> str: """Return the prompt label shown when editing a hunk-scoped note.""" if self is NoteKind.FLAG: return "flag (explain what change is needed)" return "question (agent will answer, not change code)" @property def global_editor_label(self) -> str: """Return the prompt label shown when editing a global note.""" if self is NoteKind.FLAG: return "global change request" return "global question (agent answers, no code change)" class LineSide(enum.Enum): """Enumerate whether a note targets an added or removed line.""" ADDED = "+" REMOVED = "-" class DisplayLineKind(enum.Enum): """Enumerate logical diff row types — each value is the unified-diff prefix.""" ADDED = "+" REMOVED = "-" CONTEXT = " " META = "\\" class NoteScope(enum.Enum): """Enumerate where a note is attached in the review model.""" GLOBAL = enum.auto() HUNK = enum.auto() LINE = enum.auto() @dataclass class Hunk: """A single diff hunk with optional review annotation.""" file_header: str range_line: str body: str raw: str file_path: str = "" start_line: int = 0 approved: bool = False notes: list[HunkNote] = field(default_factory=list) display_lines: list[DisplayLine] = field(default_factory=list) @property def short_location(self) -> str: """Return a compact 'file:line' location string.""" return ( f"{self.file_path}:{self.start_line}" if self.file_path else self.range_line.strip() ) @property def summary_status(self) -> Status | None: """Return a summary status for chrome/markers.""" if self.approved: return Status.APPROVED if self.notes: if any(note.kind is NoteKind.FLAG for note in self.notes): return Status.FLAG if any(note.kind is NoteKind.QUESTION for note in self.notes): return Status.QUESTION return None @property def is_handled(self) -> bool: """Return whether the hunk has been reviewed.""" return self.approved or bool(self.notes) def get_note(self, target: NoteTarget) -> HunkNote | None: """Find a note matching *target*, or None.""" return next((n for n in self.notes if n.target == target), None) def upsert_note(self, kind: NoteKind, target: NoteTarget, text: str) -> None: """Insert or update a note for *target*.""" for i, note in enumerate(self.notes): if note.target == target: self.notes[i] = HunkNote(kind=kind, target=target, text=text) return self.notes.append(HunkNote(kind=kind, target=target, text=text)) def remove_note(self, target: NoteTarget) -> None: """Remove the note matching *target*, if any.""" for i, note in enumerate(self.notes): if note.target == target: del self.notes[i] return def build_hunk_context(self, scroll_offset: int) -> list[str]: """Build a diff snippet for the editor starting from *scroll_offset*.""" start = max(0, scroll_offset) end = min(len(self.display_lines), start + EDITOR_HUNK_CONTEXT_MAX) lines: list[str] = [] for dl in self.display_lines[start:end]: line_num = dl.new_line_number or dl.old_line_number or 0 lines.append( f"# {EDITOR_CONTEXT_PAD} {line_num:>4} {dl.kind.value} {dl.text}" ) return lines def build_line_context(self, target: LineTarget) -> list[str]: """Build context lines around *target* for the editor comment template.""" target_idx = next( (i for i, dl in enumerate(self.display_lines) if dl.target == target), None, ) if target_idx is None: return [] start = max(0, target_idx - EDITOR_CONTEXT_RADIUS) end = min(len(self.display_lines), target_idx + EDITOR_CONTEXT_RADIUS + 1) lines: list[str] = [] for dl in self.display_lines[start:end]: line_num = dl.new_line_number or dl.old_line_number or 0 marker = EDITOR_TARGET_MARKER if dl.target == target else EDITOR_CONTEXT_PAD lines.append(f"# {marker} {line_num:>4} {dl.kind.value} {dl.text}") return lines @dataclass class GlobalNote: """A review annotation not tied to any specific hunk.""" kind: NoteKind text: str @dataclass(frozen=True) class LineTarget: """A note target tied to one added or removed line.""" side: LineSide line_number: int def __str__(self) -> str: """Return the serialized form for output format.""" return f"{self.side.value}{self.line_number}" @dataclass(frozen=True) class HunkTarget: """A note target tied to a whole diff hunk.""" def __str__(self) -> str: """Return the serialized form for output format.""" return NOTE_TARGET_HUNK NoteTarget = HunkTarget | LineTarget @dataclass class HunkNote: """A review note attached to one target inside a hunk.""" kind: NoteKind target: NoteTarget text: str @dataclass(frozen=True) class DisplayLine: """One parsed logical diff row from a hunk body.""" kind: DisplayLineKind text: str old_line_number: int | None new_line_number: int | None target: LineTarget | None @dataclass class ReviewState: """Mutable state for the interactive review session.""" hunks: list[Hunk] global_notes: list[GlobalNote] current_index: int = 0 scroll_offset: int = 0 def find_next_unhandled(self) -> int: """Find the next hunk with no review, wrapping around.""" n = len(self.hunks) for offset in range(1, n + 1): candidate = (self.current_index + offset) % n if not self.hunks[candidate].is_handled: return candidate return self.current_index @staticmethod def initial_index(hunks: list[Hunk]) -> int: """Return the index of the first unhandled hunk, or 0.""" return next((i for i, h in enumerate(hunks) if not h.is_handled), 0) def navigate(self, key: str) -> bool: """Handle navigation keys (j/k/arrows), returning True if index changed.""" if key in ("j", "down") and self.current_index < len(self.hunks) - 1: self.current_index += 1 return True if key in ("k", "up") and self.current_index > 0: self.current_index -= 1 return True return False def approve(self) -> None: """Toggle approval on the current hunk, ignoring hunks that have notes.""" hunk = self.hunks[self.current_index] if hunk.approved: hunk.approved = False elif not hunk.notes: hunk.approved = True self.current_index = self.find_next_unhandled() def approve_file(self) -> None: """Approve all note-free hunks in the same file as the current hunk.""" current_file = self.hunks[self.current_index].file_path for hunk in self.hunks: if hunk.file_path == current_file and not hunk.approved and not hunk.notes: hunk.approved = True self.current_index = self.find_next_unhandled() def managed_note_refs(self) -> list[ManagedNoteRef]: """Build a flat list of note references for the manage-notes panel.""" refs: list[ManagedNoteRef] = [] for hunk in self.hunks: for note in hunk.notes: label = f"{hunk.short_location} @ {note.target}" refs.append( ManagedNoteRef(scope_label=label, text=note.text, kind=note.kind) ) refs.extend( ManagedNoteRef(scope_label=GLOBAL_TARGET, text=gnote.text, kind=gnote.kind) for gnote in self.global_notes ) return refs def find_note_by_ref( self, ref: ManagedNoteRef, ) -> tuple[Hunk, HunkNote] | GlobalNote | None: """Locate the note described by *ref*.""" for hunk in self.hunks: for note in hunk.notes: label = f"{hunk.short_location} @ {note.target}" if ( label == ref.scope_label and note.text == ref.text and note.kind == ref.kind ): return hunk, note for gnote in self.global_notes: if ( gnote.text == ref.text and gnote.kind == ref.kind and ref.scope_label == GLOBAL_TARGET ): return gnote return None @dataclass(frozen=True) class DiffViewport: """Describes which diff lines are visible in the current terminal viewport.""" scroll_offset: int visible_line_count: int total_lines: int can_scroll_up: bool can_scroll_down: bool lines_above: int lines_below: int @dataclass(frozen=True) class ManagedNoteRef: """A reference to one note for the manage-notes panel.""" scope_label: str text: str kind: NoteKind @dataclass class NotePanelState: """Cursor and scroll state for the manage-notes panel.""" cursor: int = 0 scroll_offset: int = 0 def parse_diff(diff_text: str) -> list[Hunk]: """Parse a unified diff into a list of reviewable hunks.""" hunks: list[Hunk] = [] lines = diff_text.splitlines(keepends=True) current_file_header = "" current_file_path = "" i = 0 while i < len(lines): line = lines[i] if line.startswith(("diff --git", "diff ")): current_file_header, current_file_path, i = parse_file_header( lines, i, current_file_path, ) continue if line.startswith("@@"): hunk, i = parse_hunk_body(lines, i, current_file_header, current_file_path) hunks.append(hunk) continue i += 1 return hunks def parse_file_header( lines: list[str], start: int, current_file_path: str, ) -> tuple[str, str, int]: """ Parse a diff file header block starting at *start*. Returns (file_header, file_path, next_line_index). """ header_lines = [lines[start]] i = start + 1 file_path = current_file_path while ( i < len(lines) and not lines[i].startswith("@@") and not lines[i].startswith("diff ") ): header_lines.append(lines[i]) if lines[i].startswith("+++ "): path = lines[i][4:].strip() path = path.removeprefix("b/") file_path = path i += 1 file_header = "".join(header_lines).rstrip("\n") return file_header, file_path, i def parse_hunk_body( lines: list[str], start: int, file_header: str, file_path: str, ) -> tuple[Hunk, int]: """ Parse a single @@ hunk starting at *start*. Returns (Hunk, next_line_index). """ range_line = lines[start].rstrip("\n") match = re.search(r"\+(\d+)", range_line) start_line = int(match.group(1)) if match else 0 body_lines: list[str] = [] i = start + 1 while ( i < len(lines) and not lines[i].startswith("@@") and not lines[i].startswith("diff ") ): body_lines.append(lines[i]) i += 1 body = "".join(body_lines).rstrip("\n") raw = ( f"{file_header}\n{range_line}\n{body}" if file_header else f"{range_line}\n{body}" ) hunk = Hunk( file_header=file_header, range_line=range_line, body=body, raw=raw, file_path=file_path, start_line=start_line, display_lines=parse_display_lines(range_line, body), ) return hunk, i def parse_display_lines(range_line: str, body: str) -> list[DisplayLine]: """Parse a hunk body into structured display rows with line numbers.""" match = HUNK_RANGE_LINE_RE.match(range_line) if not match: return [] old_num = int(match.group("old_start")) new_num = int(match.group("new_start")) result: list[DisplayLine] = [] for line in body.splitlines(): if not line: continue try: line_kind = DisplayLineKind(line[0]) except ValueError: line_kind = DisplayLineKind.CONTEXT if line_kind is DisplayLineKind.META: result.append( DisplayLine( kind=line_kind, text=line, old_line_number=None, new_line_number=None, target=None, ) ) continue text = line[1:] if len(line) > 1 else "" target: LineTarget | None = None if line_kind is DisplayLineKind.ADDED: target = LineTarget(side=LineSide.ADDED, line_number=new_num) elif line_kind is DisplayLineKind.REMOVED: target = LineTarget(side=LineSide.REMOVED, line_number=old_num) result.append( DisplayLine( kind=line_kind, text=text, old_line_number=( old_num if line_kind is not DisplayLineKind.ADDED else None ), new_line_number=( new_num if line_kind is not DisplayLineKind.REMOVED else None ), target=target, ) ) if line_kind is not DisplayLineKind.ADDED: old_num += 1 if line_kind is not DisplayLineKind.REMOVED: new_num += 1 return result def parse_note_target(text: str) -> NoteTarget | None: """Parse a target string back into a NoteTarget.""" text = text.strip() if text == NOTE_TARGET_HUNK: return HunkTarget() if text: try: side = LineSide(text[0]) except ValueError: return None try: line_number = int(text[1:]) return LineTarget(side=side, line_number=line_number) except ValueError: return None return None def render_through_delta(diff_text: str, width: int = 120) -> bytes: """Pipe *diff_text* through ``delta`` and return raw bytes (with ANSI).""" try: result = subprocess.run( [ "delta", "--width", str(width), "--file-style", "omit", "--hunk-header-style", "omit", "--paging", "never", ], input=diff_text.encode(), capture_output=True, timeout=5, ) if result.returncode == 0 and result.stdout.strip(): return result.stdout except (FileNotFoundError, subprocess.TimeoutExpired): pass return diff_text.encode() def format_output( hunks: list[Hunk], global_notes: list[GlobalNote], diff_source: str = "", ) -> str: """ Produce compact, unambiguous review output for an LLM agent. Approved hunks are omitted. Each note gets its own section with an explicit target (hunk, +N, -N). Global notes appear first. When *diff_source* is non-empty it is included in the header so the agent knows how to retrieve the reviewed diff. """ out = StringIO() all_notes = [(hunk, note) for hunk in hunks for note in hunk.notes] if not all_notes and not global_notes: approved = sum(1 for h in hunks if h.approved) out.write("# Review: all clear, no action needed.\n") if diff_source: out.write(f"# Reviewed diff: `{diff_source}`\n") out.write(f"# {approved}/{len(hunks)} hunks approved.\n") out.write(format_footer(hunks)) return out.getvalue() out.write(OUTPUT_PREAMBLE) if diff_source: out.write(f"- Reviewed diff: `{diff_source}`\n") for note in global_notes: write_note_section(out, note.kind, GLOBAL_TARGET, note.text) for hunk, hnote in all_notes: write_note_section( out, hnote.kind, f"{hunk.file_path} @ {hnote.target}", hnote.text, diff_block=format_diff_block(hunk, hnote), ) out.write(format_footer(hunks)) return out.getvalue() def write_note_section( out: StringIO, kind: NoteKind, heading: str, text: str, diff_block: str | None = None, ) -> None: """Write a single note section (global or hunk-scoped) to *out*.""" out.write(f"\n### [{kind.value}] `{heading}`\n\n") if diff_block: out.write(diff_block) out.write(text) out.write("\n") def format_diff_block(hunk: Hunk, note: HunkNote) -> str: """Format the fenced diff block for a hunk note section.""" trimmed = trim_body_lines(hunk, note) return f"```diff\n{hunk.range_line}\n" + "\n".join(trimmed) + "\n```\n\n" def format_footer(hunks: list[Hunk]) -> str: """Format the HTML comment footer with the approved-hunk bitmap.""" return f"\n\n" def trim_body_lines(hunk: Hunk, note: HunkNote) -> list[str]: """Trim long hunk body lines, centering on the target line when applicable.""" body_lines = hunk.body.splitlines() if len(body_lines) <= HUNK_BODY_MAX_LINES: return body_lines target_index: int | None = None if isinstance(note.target, LineTarget): body_indices = [i for i, line in enumerate(body_lines) if line] for j, dl in enumerate(hunk.display_lines): if dl.target == note.target and j < len(body_indices): target_index = body_indices[j] break if target_index is None: return [ *body_lines[:HUNK_BODY_CONTEXT_LINES], "# ...", *body_lines[-HUNK_BODY_CONTEXT_LINES:], ] half = HUNK_BODY_MAX_LINES // 2 start = max(0, min(target_index - half, len(body_lines) - HUNK_BODY_MAX_LINES)) end = min(len(body_lines), start + HUNK_BODY_MAX_LINES) trimmed: list[str] = [] if start > 0: trimmed.append("# ...") trimmed.extend(body_lines[start:end]) if end < len(body_lines): trimmed.append("# ...") return trimmed def bitmap_byte_length(num_hunks: int) -> int: """Return the number of bytes needed to store *num_hunks* approval bits.""" return (num_hunks + BITS_PER_BYTE - 1) // BITS_PER_BYTE def encode_approved_bitmap(hunks: list[Hunk]) -> str: """Encode the approved/pending state of *hunks* as a base64 string.""" n = bitmap_byte_length(len(hunks)) bits = sum(1 << i for i, h in enumerate(hunks) if h.approved) return base64.b64encode(bits.to_bytes(n, "little")).decode("ascii") def decode_approved_bitmap(encoded: str, num_hunks: int) -> list[bool]: """ Decode a base64 bitmap into a list of booleans (one per hunk). Returns an empty list if *encoded* is invalid or the length does not match *num_hunks*. """ try: raw = base64.b64decode(encoded) except (ValueError, binascii.Error): return [] if len(raw) != bitmap_byte_length(num_hunks): return [] bits = int.from_bytes(raw, "little") return [bool(bits & (1 << i)) for i in range(num_hunks)] def load_previous_review( path: str, ) -> tuple[ dict[tuple[str, str, NoteTarget], tuple[NoteKind, str]], list[GlobalNote], str ]: """ Parse a previous review output file. Returns a mapping of ``(file_path, range_line, target) → (kind, text)`` for hunk annotations, a list of ``GlobalNote`` for global ones, and the raw approved-bitmap string (empty if absent). """ try: text = Path(path).read_text() except (FileNotFoundError, OSError): return {}, [], "" if not text.strip(): return {}, [], "" annotations: dict[tuple[str, str, NoteTarget], tuple[NoteKind, str]] = {} global_notes: list[GlobalNote] = [] bitmap_match = FOOTER_RE.search(text) bitmap_encoded = bitmap_match.group(1) if bitmap_match else "" sections = re.split(r"^### ", text, flags=re.MULTILINE) for section in sections: parse_review_section(section, annotations, global_notes) return annotations, global_notes, bitmap_encoded def parse_review_section( section: str, annotations: dict[tuple[str, str, NoteTarget], tuple[NoteKind, str]], global_notes: list[GlobalNote], ) -> None: """Parse one ``### ...`` section and populate *annotations* or *global_notes*.""" if not section.strip(): return header_match = SECTION_HEADER_RE.match(section) if not header_match: return action_str = header_match.group(1) file_or_global = header_match.group(2).strip() target_str = header_match.group(3) kind = NoteKind(action_str) comment = extract_comment_lines(section) scope = NoteScope.GLOBAL if file_or_global == GLOBAL_TARGET else NoteScope.HUNK if scope is NoteScope.GLOBAL: if comment: global_notes.append(GlobalNote(kind=kind, text=comment)) return range_match = re.search(r"```diff\n(@@[^\n]+)", section) if not range_match: return range_line = range_match.group(1) target: NoteTarget = HunkTarget() if target_str: parsed = parse_note_target(target_str.strip()) if parsed is not None: target = parsed annotations[(file_or_global, range_line, target)] = (kind, comment) def extract_comment_lines(section: str) -> str: """Extract the review message text from a section body.""" body = re.sub(r"```diff\n.*?```\n?", "", section, flags=re.DOTALL) body = SECTION_HEADER_RE.sub("", body) body = FOOTER_RE.sub("", body) return body.strip() def apply_previous_review( hunks: list[Hunk], annotations: dict[tuple[str, str, NoteTarget], tuple[NoteKind, str]], ) -> int: """ Apply previously saved annotations to *hunks*. Returns the number of annotations that matched. """ lookup: dict[tuple[str, str], Hunk] = {} for hunk in hunks: lookup[(hunk.file_path, hunk.range_line)] = hunk matched = 0 for (file_path, range_line, target), (kind, text) in annotations.items(): hunk = lookup.get((file_path, range_line)) if hunk is None: continue hunk.upsert_note(kind, target, text) matched += 1 return matched def write_comment_template( file_obj: IO[str], location: str, existing: str, context_lines: list[str] | None = None, ) -> int: """ Write the comment template into *file_obj*. Returns the editor jump-line number (1-indexed). """ file_obj.write(f"# Comment for {location}\n") file_obj.write("# Lines starting with # are stripped.\n") header_count = 2 if context_lines: file_obj.write("#\n") file_obj.writelines(f"{ctx}\n" for ctx in context_lines) file_obj.write("#\n") header_count += len(context_lines) + 2 file_obj.write("\n") if existing: file_obj.write(existing) file_obj.write("\n") content_lines = existing.count("\n") + 1 if existing else 1 return header_count + 1 + content_lines # header + blank + content def read_comment_file(path: str) -> str: """Read a comment file, stripping lines that start with ``#``.""" with open(path) as f: lines = [line for line in f if not line.startswith("#")] return "".join(lines).strip() @contextmanager def duplicated_fd(fd: int) -> Iterator[int]: """Yield a duplicate of *fd* and close it automatically.""" dup_fd = os.dup(fd) try: yield dup_fd finally: os.close(dup_fd) @contextmanager def nonblocking_pipe() -> Iterator[tuple[int, int]]: """Yield a non-blocking pipe pair and close both ends automatically.""" read_fd, write_fd = os.pipe() try: os.set_blocking(read_fd, False) os.set_blocking(write_fd, False) yield read_fd, write_fd finally: os.close(read_fd) os.close(write_fd) def visible_len(text: str) -> int: """Return the visible character count of *text* (ignoring ANSI escapes).""" return len(ANSI_ESCAPE_TEXT_RE.sub("", text)) def truncate_ansi_text(text: str, max_visible: int) -> str: """Truncate *text* to *max_visible* visible chars, preserving ANSI escapes.""" if max_visible <= 0: return "" if visible_len(text) <= max_visible: return text budget = max_visible - len(TRUNCATION_ELLIPSIS) if budget <= 0: return TRUNCATION_ELLIPSIS[:max_visible] result: list[str] = [] visible_count = 0 pos = 0 while pos < len(text) and visible_count < budget: m = ANSI_ESCAPE_TEXT_RE.match(text, pos) if m: result.append(m.group()) pos = m.end() else: result.append(text[pos]) visible_count += 1 pos += 1 result.append(RESET) result.append(TRUNCATION_ELLIPSIS) return "".join(result) def wrap_plain_text(text: str, max_width: int) -> list[str]: """Word-wrap plain *text* (no ANSI) to *max_width* chars per line.""" if len(text) <= max_width: return [text] words = text.split() lines: list[str] = [] current = "" for word in words: candidate = f"{current} {word}" if current else word if len(candidate) > max_width and current: lines.append(current) current = word else: current = candidate if current: lines.append(current) return lines def update_active_sgr(esc: str, active_sgr: list[str]) -> None: """Track active SGR sequences, clearing on reset.""" if esc.endswith("m"): if esc in (f"{CSI}0m", f"{CSI}m"): active_sgr.clear() else: active_sgr.append(esc) def wrap_ansi_line_to_rows(line: bytes, term_width: int) -> list[bytes]: """Split one ANSI-colored line into terminal-width display rows.""" if term_width <= 1: return [line] text = line.decode("utf-8", errors="replace") chunks: list[str] = [] current: list[str] = [] visible = 0 active_sgr: list[str] = [] def flush_chunk() -> None: nonlocal current, visible if active_sgr: current.append(RESET) chunks.append("".join(current)) current = list(active_sgr) visible = 0 def append_plain(plain: str) -> None: nonlocal visible for ch in plain: if visible >= term_width: flush_chunk() current.append(ch) visible += 1 pos = 0 for match in ANSI_ESCAPE_TEXT_RE.finditer(text): append_plain(text[pos : match.start()]) esc = match.group(0) current.append(esc) update_active_sgr(esc, active_sgr) pos = match.end() append_plain(text[pos:]) if current or not chunks: chunks.append("".join(current)) return [chunk.encode("utf-8", errors="replace") for chunk in chunks] def build_margin_markers( hunk: Hunk, selected_target: LineTarget | None = None, ) -> tuple[list[bytes] | None, int | None]: """ Build margin marker bytes and optional highlight index. Returns ``(None, None)`` when the hunk has no line notes and no *selected_target*. When *selected_target* is given the markers are always built so the line picker can display them. """ has_line_notes = any(isinstance(n.target, LineTarget) for n in hunk.notes) if not has_line_notes and selected_target is None: return None, None markers: list[bytes] = [] highlight_index: int | None = None for i, dl in enumerate(hunk.display_lines): if selected_target is not None and dl.target == selected_target: highlight_index = i markers.append(MARGIN_EMPTY) elif dl.target and (note := hunk.get_note(dl.target)): markers.append(f"{note.kind.icon} ".encode()) else: markers.append(MARGIN_EMPTY) return markers, highlight_index def initial_line_picker_cursor( selectable: list[DisplayLine], hunk: Hunk, scroll_offset: int, ) -> int: """Return the selectable index of the first line at or after *scroll_offset*.""" for cursor, dl in enumerate(selectable): idx = next((i for i, d in enumerate(hunk.display_lines) if d is dl), None) if idx is not None and idx >= scroll_offset: return cursor return 0 def build_display_lines( delta_output: bytes, term_width: int, margin_markers: list[bytes] | None = None, highlight_index: int | None = None, highlight_escape: bytes = REVERSE.encode(), ) -> list[bytes]: """Return physical terminal rows for rendered diff output.""" raw_lines = delta_output.split(b"\n") # Delta output ends with a trailing newline and may start with a leading # blank line; strip both to avoid visual gaps in the chrome. if raw_lines and raw_lines[-1] == b"": raw_lines = raw_lines[:-1] if raw_lines and raw_lines[0] == b"": raw_lines = raw_lines[1:] wrap_width = term_width - MARGIN_WIDTH if margin_markers else term_width display_lines: list[bytes] = [] for i, line in enumerate(raw_lines): highlighted = highlight_index is not None and i == highlight_index if highlighted: plain = ANSI_ESCAPE_BYTES_RE.sub(b"", line) wrapped = wrap_ansi_line_to_rows(plain, wrap_width) else: wrapped = wrap_ansi_line_to_rows(line, wrap_width) if margin_markers: margin = margin_markers[i] if i < len(margin_markers) else MARGIN_EMPTY for j, segment in enumerate(wrapped): prefix = margin if j == 0 else MARGIN_EMPTY row = prefix + segment if highlighted: row_text = row.decode("utf-8", errors="replace") pad = max(0, term_width - visible_len(row_text)) row = highlight_escape + row + b" " * pad + RESET.encode() display_lines.append(row) elif highlighted: display_lines.extend( highlight_escape + seg + b" " * max( 0, term_width - visible_len(seg.decode("utf-8", errors="replace")), ) + RESET.encode() for seg in wrapped ) else: display_lines.extend(wrapped) return display_lines or [b""] def compute_diff_viewport( total_lines: int, term_height: int, scroll_offset: int, ) -> DiffViewport: """ Compute which diff lines are visible given the terminal geometry. The fixed chrome occupies ``CHROME_ROWS`` rows (top bar + markers + footer). Scroll indicators are reserved only when actually needed. """ max_avail = term_height - CHROME_ROWS if total_lines <= max_avail: scroll_offset = 0 can_scroll_up = False can_scroll_down = False visible_count = total_lines else: max_offset = max(0, total_lines - (max_avail - SCROLL_INDICATOR_ROWS)) scroll_offset = min(scroll_offset, max_offset) can_scroll_up = scroll_offset > 0 rows_with_both = max_avail - (1 if can_scroll_up else 0) - 1 available_rows = max(rows_with_both, MIN_VISIBLE_ROWS) visible_count = min(available_rows, total_lines - scroll_offset) can_scroll_down = scroll_offset + visible_count < total_lines if not can_scroll_down: rows_without_down = max_avail - (1 if can_scroll_up else 0) available_rows = max(rows_without_down, MIN_VISIBLE_ROWS) visible_count = min(available_rows, total_lines - scroll_offset) lines_above = scroll_offset lines_below = total_lines - scroll_offset - visible_count return DiffViewport( scroll_offset=scroll_offset, visible_line_count=visible_count, total_lines=total_lines, can_scroll_up=can_scroll_up, can_scroll_down=can_scroll_down, lines_above=lines_above, lines_below=lines_below, ) def chrome_line(text: str, term_width: int) -> str: """Wrap *text* with the chrome background, padded to *term_width*.""" bg_text = BG_CHROME + text.replace(RESET, RESET + BG_CHROME) pad = max(0, term_width - visible_len(text)) return bg_text + " " * pad + RESET def build_progress_hunk_segment( current_index: int, hunks: list[Hunk], ) -> str: """Build the 'Hunk N/M' segment with a progress fill.""" total = len(hunks) handled = sum(1 for h in hunks if h.is_handled) label = f" Hunk {current_index + 1}/{total} " filled = len(label) * handled // total filled_part = label[:filled] unfilled_part = label[filled:] return f"│{BLACK}{BG_CYAN}{filled_part}{RESET}{unfilled_part}│" def build_top_bar( hunk: Hunk, current_index: int, hunks: list[Hunk], global_notes: list[GlobalNote], term_width: int = 0, ) -> str: """Build the top status bar string for the current hunk.""" counters = build_top_bar_counters(hunks, global_notes) progress_segment = build_progress_hunk_segment(current_index, hunks) bar = ( f"{BOLD}{WHITE}{BG_BLUE} neorev {RESET}" f"{progress_segment}" f" {BOLD}{hunk.short_location}{RESET}" f"{counters}" ) if term_width > 0: bar = truncate_ansi_text(bar, term_width) return bar def build_top_bar_counters( hunks: list[Hunk], global_notes: list[GlobalNote], ) -> str: """Build the compact note-count segment for the top bar.""" counts: Counter[tuple[type, NoteKind]] = Counter( (type(note.target), note.kind) for hunk in hunks for note in hunk.notes ) global_counts: Counter[NoteKind] = Counter(n.kind for n in global_notes) parts = [ f"global {NoteKind.FLAG.icon} {global_counts[NoteKind.FLAG]}" f" {NoteKind.QUESTION.icon} {global_counts[NoteKind.QUESTION]}", f"hunk {NoteKind.FLAG.icon} {counts[(HunkTarget, NoteKind.FLAG)]}" f" {NoteKind.QUESTION.icon} {counts[(HunkTarget, NoteKind.QUESTION)]}", f"line {NoteKind.FLAG.icon} {counts[(LineTarget, NoteKind.FLAG)]}" f" {NoteKind.QUESTION.icon} {counts[(LineTarget, NoteKind.QUESTION)]}", ] return " │ " + " │ ".join(parts) def hunk_marker(hunk: Hunk, *, is_current: bool) -> str: """Return the ANSI-styled marker for a single hunk (3 visible chars).""" icon = "·" color = DIM status = hunk.summary_status if status == Status.APPROVED: icon, color = "✓", GREEN elif status == Status.FLAG: icon, color = "✗", RED elif status == Status.QUESTION: icon, color = "?", YELLOW if is_current: return f"{BOLD}[{color}{icon}{RESET}{BOLD}]{RESET}" return f" {color}{icon}{RESET} " def build_progress_markers( hunks: list[Hunk], current_index: int, term_width: int, ) -> str: """Build the hunk progress marker line, with overflow arrows.""" total = len(hunks) prefix = " " prefix_width = 2 available = term_width - prefix_width if available < MARKER_WIDTH: return "" max_markers = available // MARKER_WIDTH if max_markers >= total: markers = [ hunk_marker(hunks[i], is_current=i == current_index) for i in range(total) ] return prefix + "".join(markers) slots = (available - ARROW_WIDTH * 2) // MARKER_WIDTH slots = max(slots, 1) half = slots // 2 start = current_index - half end = start + slots if start < 0: start, end = 0, slots if end > total: end = total start = max(0, end - slots) need_left = start > 0 need_right = end < total if not need_left or not need_right: extra = ( available - (ARROW_WIDTH if need_left or need_right else 0) ) // MARKER_WIDTH extra = max(extra, 1) if extra > slots: slots = min(extra, total) half = slots // 2 start = current_index - half end = start + slots if start < 0: start, end = 0, slots if end > total: end = total start = max(0, end - slots) need_left = start > 0 need_right = end < total markers = [ hunk_marker(hunks[i], is_current=i == current_index) for i in range(start, end) ] left = f"{DIM}◀ {RESET}" if need_left else "" right = f"{DIM} ▶{RESET}" if need_right else "" return prefix + left + "".join(markers) + right def build_keyhint_footer( segments: list[tuple[str, str]], term_width: int, *, ellipsis: bool = False ) -> str: """Build a key-hints footer from *segments*, truncated to *term_width*.""" parts: list[str] = [] used = 0 truncated = False for i, (key, label) in enumerate(segments): sep = " " if i > 0 else "" segment = f"{sep}{DIM}{key}{RESET} {label}" segment_width = visible_len(segment) if used + segment_width > term_width: truncated = True break parts.append(segment) used += segment_width if ellipsis and truncated and used < term_width: padding = term_width - used - len(TRUNCATION_ELLIPSIS) if padding >= 0: parts.append(f"{' ' * padding}{DIM}{TRUNCATION_ELLIPSIS}{RESET}") return "".join(parts) NOTE_PANEL_FOOTER_SEGMENTS = [ ("j/k", "navigate"), ("e/Enter", "edit"), ("d", "delete"), ("Esc", "back"), ] MAIN_FOOTER_SEGMENTS = [ ("j/k", "navigate"), ("a/A", "approve hunk/file"), ("c/f", "question/flag"), ("gc/gf", "global"), ("m", "manage notes"), ("q", "quit & save"), ("?", "help"), ] def build_note_panel_header(note_count: int, term_width: int) -> str: """Build the separator header line for the note panel.""" label = f" Notes ({note_count}) " if note_count else " Notes " fill_width = max(0, term_width - len(label) - 1) return f"─{label}{'─' * fill_width}" def ends_with_separator(s: str) -> bool: """Return whether *s* ends with a non-alphanumeric, non-whitespace character.""" return bool(s) and not s[-1].isalnum() and not s[-1].isspace() def note_preview_text(text: str) -> str: """Build a single-line preview from a possibly multi-line note.""" lines = [line.strip() for line in text.splitlines()] lines = [line for line in lines if line] parts: list[str] = [] for i, line in enumerate(lines): if parts and not ends_with_separator(lines[i - 1]): parts.append(PREVIEW_INTERPUNCT) elif parts: parts.append(" ") parts.append(line) return " ".join("".join(parts).split()) def build_note_panel_row( ref: ManagedNoteRef, *, selected: bool, term_width: int ) -> str: """Build one rendered row for a note in the panel.""" cursor_mark = f"{BOLD}{NOTE_PANEL_CURSOR}{RESET}" if selected else " " kind_str = ( f"{RED}flag{RESET}" if ref.kind is NoteKind.FLAG else f"{YELLOW}question{RESET}" ) preview = note_preview_text(ref.text) prefix = f"{cursor_mark}[{kind_str}] {DIM}{ref.scope_label}{RESET} " max_preview = term_width - visible_len(prefix) - 1 if max_preview > 0 and len(preview) > max_preview: preview = preview[: max_preview - 1] + TRUNCATION_ELLIPSIS elif max_preview <= 0: preview = "" return f"{prefix}{preview}" def compute_note_panel_height(note_count: int, term_height: int) -> int: """Compute the number of terminal rows the note panel should occupy.""" max_panel = max(NOTE_PANEL_MIN_HEIGHT, term_height // NOTE_PANEL_MAX_FRACTION) content_rows = max(note_count, 1) return min(content_rows + NOTE_PANEL_CHROME_ROWS, max_panel) def drain_fd(fd: int) -> None: """Read and discard all currently available bytes from *fd*.""" while True: ready, _, _ = select.select([fd], [], [], SELECT_IMMEDIATE) if not ready: break if not os.read(fd, DRAIN_READ_SIZE): break def debounce_resize(wakeup_read_fd: int) -> None: """Wait briefly to coalesce rapid resize signals into one redraw.""" while True: ready, _, _ = select.select([wakeup_read_fd], [], [], RESIZE_DEBOUNCE_TIMEOUT) if not ready: break drain_fd(wakeup_read_fd) class Terminal: """Terminal I/O state for the interactive TUI.""" DEFAULT_WIDTH = 120 DEFAULT_HEIGHT = 40 HELP_KEY_COLUMN_WIDTH = 14 HELP_BOX_MAX_WIDTH = 78 CLEAR_SCREEN = f"{CSI}2J{CSI}H" ALT_SCREEN_ON = f"{CSI}?1049h" ALT_SCREEN_OFF = f"{CSI}?1049l" CURSOR_HIDE = f"{CSI}?25l" CURSOR_SHOW = f"{CSI}?25h" ESC_BYTE = b"\x1b" ARROW_UP_SEQ = b"[A" ARROW_DOWN_SEQ = b"[B" KEY_CTRL_C = "\x03" KEY_CTRL_D = "\x04" KEY_CTRL_U = "\x15" KEY_RESIZE = "resize" def __init__(self) -> None: """Open ``/dev/tty`` read-write and capture geometry and attributes.""" self.fd = os.open("/dev/tty", os.O_RDWR) self.refresh_geometry() self.old_attrs: list[int | list[bytes | int]] = termios.tcgetattr(self.fd) self.wakeup_read_fd: int | None = None def __enter__(self) -> Self: """Return self for context-managed terminal usage.""" return self def __exit__(self, *_: object) -> None: """Close the terminal when leaving a context manager block.""" self.close() def refresh_geometry(self) -> None: """Re-read terminal dimensions, clamping to safe minimums.""" try: ts = os.get_terminal_size(self.fd) self.width = max(ts.columns, MIN_TERMINAL_WIDTH) self.height = max(ts.lines, MIN_TERMINAL_HEIGHT) except OSError: self.width = max( getattr(self, "width", self.DEFAULT_WIDTH), MIN_TERMINAL_WIDTH ) self.height = max( getattr(self, "height", self.DEFAULT_HEIGHT), MIN_TERMINAL_HEIGHT ) def apply_resize(self, delta_cache: dict[int, bytes]) -> None: """Refresh geometry and clear cache if width changed.""" old_width = self.width self.refresh_geometry() if self.width != old_width: delta_cache.clear() def close(self) -> None: """Restore terminal attributes and close the file descriptor.""" termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old_attrs) os.close(self.fd) def write(self, data: bytes | str) -> None: """Write raw bytes or a string to the terminal display.""" if isinstance(data, str): data = data.encode() os.write(self.fd, data) def read_key(self, wakeup_read_fd: int | None = None) -> str: """Read one keypress, with optional wakeup pipe for resize signals.""" if wakeup_read_fd is not None: readable, _, _ = select.select([self.fd, wakeup_read_fd], [], []) if wakeup_read_fd in readable: drain_fd(wakeup_read_fd) debounce_resize(wakeup_read_fd) return self.KEY_RESIZE ch = os.read(self.fd, 1) if ch == self.ESC_BYTE: ready, _, _ = select.select([self.fd], [], [], ESC_SEQUENCE_TIMEOUT) if not ready: return "esc" seq = os.read(self.fd, 2) if seq == self.ARROW_UP_SEQ: return "up" if seq == self.ARROW_DOWN_SEQ: return "down" return "esc" return ch.decode("utf-8", errors="replace") def edit_comment( self, existing: str, location: str, context_lines: list[str] | None = None, ) -> str: """Edit a review comment using ``$EDITOR`` or an inline TTY prompt.""" editor = os.environ.get("EDITOR", "") if editor: return self.edit_comment_in_editor( existing, location, editor, context_lines ) return self.prompt_inline_comment(existing, location) def edit_comment_in_editor( self, existing: str, location: str, editor: str, context_lines: list[str] | None = None, ) -> str: """Open *editor* with a temporary file pre-filled with *existing*.""" with tempfile.TemporaryDirectory(prefix="review-") as tmpdir: tmppath = str(Path(tmpdir) / "comment.cfg") with open(tmppath, "w") as tmp: jump_line = write_comment_template( tmp, location, existing, context_lines ) with duplicated_fd(self.fd) as dup_fd: subprocess.run( [editor, f"{tmppath}:{jump_line}"], check=True, stdin=dup_fd, stdout=dup_fd, stderr=dup_fd, ) return read_comment_file(tmppath) def prompt_inline_comment(self, existing: str, location: str) -> str: """Prompt for a single-line comment directly on the TTY.""" prompt = f"\r\n Comment for {location}" if existing: prompt += f" [{existing}]" prompt += ": " self.write(prompt) old = termios.tcgetattr(self.fd) cooked = termios.tcgetattr(self.fd) cooked[3] |= termios.ECHO | termios.ICANON termios.tcsetattr(self.fd, termios.TCSADRAIN, cooked) dup_fd = os.dup(self.fd) try: with os.fdopen(dup_fd, "r") as f: line = f.readline().strip() finally: termios.tcsetattr(self.fd, termios.TCSADRAIN, old) return line or existing def edit_text_outside_tui( self, existing_text: str, prompt_label: str, context_lines: list[str] | None = None, ) -> str: """Temporarily suspend the TUI, edit text, and restore raw mode.""" self.write(f"{self.CURSOR_SHOW}{self.ALT_SCREEN_OFF}") termios.tcsetattr(self.fd, termios.TCSADRAIN, self.old_attrs) comment = self.edit_comment(existing_text, prompt_label, context_lines) self.refresh_geometry() self.write(f"{self.ALT_SCREEN_ON}{self.CURSOR_HIDE}") tty.setraw(self.fd) return comment def render_review_screen( self, hunks: list[Hunk], current_index: int, delta_output: bytes, global_notes: list[GlobalNote], scroll_offset: int = 0, ) -> int: """Render the full review screen and return the clamped scroll offset.""" hunk = hunks[current_index] margins, _ = build_margin_markers(hunk) diff_lines = build_display_lines(delta_output, self.width, margins) viewport = compute_diff_viewport(len(diff_lines), self.height, scroll_offset) self.write(self.CLEAR_SCREEN) top_bar = build_top_bar(hunk, current_index, hunks, global_notes, self.width) self.write(chrome_line(top_bar, self.width) + "\r\n") markers = build_progress_markers(hunks, current_index, self.width) self.write(chrome_line(markers, self.width) + "\r\n") self.render_diff_body(diff_lines, viewport) self.render_footer(viewport) return viewport.scroll_offset def render_diff_body( self, diff_lines: list[bytes], viewport: DiffViewport, ) -> None: """Render the visible diff lines with scroll indicators.""" if viewport.can_scroll_up: self.write( f" {BOLD}{WHITE}▲ Ctrl-U to scroll up " f"({viewport.lines_above} lines above){RESET}\r\n", ) start = viewport.scroll_offset end = start + viewport.visible_line_count for line in diff_lines[start:end]: self.write(line) self.write(b"\r\n") def render_footer( self, viewport: DiffViewport, footer_text: str | None = None ) -> None: """Render the scroll-down indicator and bottom key hints.""" if viewport.can_scroll_down: self.write(f"{CSI}{self.height - 1};1H") self.write( f" {BOLD}{WHITE}▼ Ctrl-D to scroll down " f"({viewport.lines_below} lines below){RESET}", ) self.write(f"{CSI}{self.height};1H") if footer_text is None: footer_text = build_keyhint_footer( MAIN_FOOTER_SEGMENTS, self.width, ellipsis=True ) self.write(chrome_line(footer_text, self.width)) def render_help_screen(self) -> None: """Display the keyboard shortcuts help overlay.""" self.write(self.CLEAR_SCREEN) bindings = [ ("j / ↓", "Next hunk"), ("k / ↑", "Previous hunk"), ("Ctrl-D", "Scroll diff down (half page)"), ("Ctrl-U", "Scroll diff up (half page)"), ("a", "Approve hunk (no comment needed)"), ("A", "Approve all hunks in current file"), ("c", "Add question (hunk or line target, opens $EDITOR)"), ("f", "Add flag / request change (hunk or line target, opens $EDITOR)"), ( "gc", "Add global question (not tied to a specific hunk, opens $EDITOR)", ), ( "gf", "Add global flag / request change (not tied to a specific hunk," " opens $EDITOR)", ), ("m", "Manage notes (edit / delete)"), ("q / Ctrl-C", "Quit and write output to file"), ("?", "Show help"), ] col = self.HELP_KEY_COLUMN_WIDTH max_w = self.HELP_BOX_MAX_WIDTH right_margin = 1 desc_max = max_w - 2 - col - right_margin # 2-char indent + key column + margin binding_rows: list[str] = [] for key, desc in bindings: desc_lines = wrap_plain_text(desc, desc_max) pad = " " * (col - len(key)) binding_rows.append(f" {BOLD}{key}{RESET}{pad}{desc_lines[0]}") cont_pad = " " * (2 + col) binding_rows.extend(f"{cont_pad}{cont}" for cont in desc_lines[1:]) rows = [ f"{BOLD} neorev — Keyboard Shortcuts{RESET}", "", *binding_rows, "", f" Set {BOLD}$EDITOR{RESET} (e.g. hx, nvim) for multi-line", " comments. Without it, single-line prompt is used.", "", f" {DIM}Press any key to return.{RESET}", ] width = max(visible_len(row) for row in rows) + right_margin border = "─" * width self.write(chrome_line(f"┌{border}┐", self.width) + "\r\n") self.write(chrome_line(f"├{border}┤", self.width) + "\r\n") self.write_box_rows(width, rows) self.write(chrome_line(f"└{border}┘", self.width) + "\r\n") def write_box_rows(self, width: int, rows: list[str]) -> None: """Write a sequence of bordered rows, treating empty strings as blanks.""" for row in rows: self.write_box_row(width, row) def write_box_row(self, width: int, text: str = "") -> None: """Write one bordered row padded to *width*.""" pad = width - visible_len(text) self.write(chrome_line(f"│{text}{' ' * pad}│", self.width) + "\r\n") def handle_manage_notes(self, state: ReviewState) -> None: """Interactive panel to manage all notes with cursor navigation.""" panel = NotePanelState() hunk = state.hunks[state.current_index] delta_output = render_through_delta(hunk.raw, width=self.width) while True: refs = state.managed_note_refs() if refs: panel.cursor = min(panel.cursor, len(refs) - 1) else: panel.cursor = 0 self.render_note_panel(state, refs, panel, delta_output) key = self.read_key(wakeup_read_fd=self.wakeup_read_fd) if key == self.KEY_RESIZE: self.refresh_geometry() if delta_output: hunk = state.hunks[state.current_index] delta_output = render_through_delta(hunk.raw, width=self.width) continue if key in ("q", "esc", self.KEY_CTRL_C): break if key in ("j", "down") and refs: panel.cursor = min(panel.cursor + 1, len(refs) - 1) elif key in ("k", "up") and refs: panel.cursor = max(panel.cursor - 1, 0) elif key in ("e", "\r") and refs: self.modify_managed_note(state, refs[panel.cursor], edit=True) tty.setraw(self.fd) elif key == "d" and refs: self.modify_managed_note(state, refs[panel.cursor], edit=False) def render_note_panel( self, state: ReviewState, refs: list[ManagedNoteRef], panel: NotePanelState, delta_output: bytes, ) -> None: """Render the diff with a note management panel at the bottom.""" panel_height = compute_note_panel_height(len(refs), self.height) diff_height = self.height - panel_height self.write(self.CLEAR_SCREEN) if delta_output and diff_height > CHROME_ROWS: self.render_note_panel_diff(state, delta_output, diff_height) panel_top = self.height - panel_height + 1 self.write(f"{CSI}{panel_top};1H") header = build_note_panel_header(len(refs), self.width) self.write(chrome_line(header, self.width) + "\r\n") content_rows = panel_height - NOTE_PANEL_CHROME_ROWS self.render_note_panel_rows(refs, panel, content_rows) self.write(f"{CSI}{self.height};1H") footer = build_keyhint_footer(NOTE_PANEL_FOOTER_SEGMENTS, self.width) self.write(chrome_line(footer, self.width)) def render_note_panel_diff( self, state: ReviewState, delta_output: bytes, diff_height: int, ) -> None: """Render the reduced-height diff above the note panel.""" hunk = state.hunks[state.current_index] margins, _ = build_margin_markers(hunk) diff_lines = build_display_lines(delta_output, self.width, margins) top_bar = build_top_bar( hunk, state.current_index, state.hunks, state.global_notes, self.width, ) self.write(chrome_line(top_bar, self.width) + "\r\n") markers = build_progress_markers(state.hunks, state.current_index, self.width) self.write(chrome_line(markers, self.width) + "\r\n") max_lines = diff_height - NOTE_PANEL_DIFF_CHROME_ROWS start = min(state.scroll_offset, max(0, len(diff_lines) - max_lines)) end = start + max_lines for line in diff_lines[start:end]: self.write(line) self.write(b"\r\n") def render_note_panel_rows( self, refs: list[ManagedNoteRef], panel: NotePanelState, content_rows: int, ) -> None: """Render the scrollable note list inside the panel.""" empty_chrome = chrome_line("", self.width) if not refs: self.write(chrome_line(f" {DIM}No notes yet.{RESET}", self.width) + "\r\n") for _ in range(content_rows - 1): self.write(empty_chrome + "\r\n") return if panel.cursor < panel.scroll_offset: panel.scroll_offset = panel.cursor elif panel.cursor >= panel.scroll_offset + content_rows: panel.scroll_offset = panel.cursor - content_rows + 1 for i in range(content_rows): idx = panel.scroll_offset + i if idx >= len(refs): self.write(empty_chrome + "\r\n") continue row = build_note_panel_row( refs[idx], selected=idx == panel.cursor, term_width=self.width, ) self.write(chrome_line(row, self.width) + "\r\n") def modify_managed_note( self, state: ReviewState, ref: ManagedNoteRef, *, edit: bool, ) -> None: """Edit or delete the note identified by *ref*.""" found = state.find_note_by_ref(ref) if found is None: return if isinstance(found, GlobalNote): if edit: new_text = self.edit_text_outside_tui( found.text, ref.kind.global_editor_label ) if new_text: found.text = new_text return state.global_notes.remove(found) return hunk, note = found if edit: if isinstance(note.target, LineTarget): ctx = hunk.build_line_context(note.target) else: ctx = hunk.build_hunk_context(state.scroll_offset) new_text = self.edit_text_outside_tui(note.text, ref.kind.editor_label, ctx) if new_text: note.text = new_text return hunk.notes.remove(note) def handle_hunk_comment(self, state: ReviewState, key: str) -> None: """Handle ``c`` (question) or ``f`` (flag) with target selection.""" hunk = state.hunks[state.current_index] kind = NoteKind.FLAG if key == "f" else NoteKind.QUESTION target: NoteTarget | None = ( self.pick_line_target(state, kind) if any(dl.target is not None for dl in hunk.display_lines) else HunkTarget() ) if target is None: return existing = hunk.get_note(target) existing_text = existing.text if existing else "" if isinstance(target, LineTarget): ctx = hunk.build_line_context(target) else: ctx = hunk.build_hunk_context(state.scroll_offset) comment = self.edit_text_outside_tui( existing_text, f"{hunk.short_location} @ {target} [{kind.editor_label}]", ctx, ) if comment: hunk.upsert_note(kind, target, comment) hunk.approved = False if isinstance(target, HunkTarget): state.current_index = state.find_next_unhandled() elif existing: hunk.remove_note(target) def pick_line_target(self, state: ReviewState, kind: NoteKind) -> NoteTarget | None: """Display a full-screen delta line picker and return the chosen target.""" hunk = state.hunks[state.current_index] selectable = [dl for dl in hunk.display_lines if dl.target is not None] if not selectable: return None cursor = initial_line_picker_cursor(selectable, hunk, state.scroll_offset) scroll = state.scroll_offset highlight = ( BG_YELLOW_BLACK.encode() if kind is NoteKind.FLAG else REVERSE.encode() ) delta_output = render_through_delta( hunk.raw, width=self.width, ) while True: scroll = self.render_line_picker( state, selectable, cursor, delta_output, scroll, highlight_escape=highlight, ) key = self.read_key(wakeup_read_fd=self.wakeup_read_fd) if key == self.KEY_RESIZE: self.refresh_geometry() delta_output = render_through_delta( hunk.raw, width=self.width, ) continue if key in ("q", "esc", self.KEY_CTRL_C): return None if key == "h": return HunkTarget() if key in ("j", "down"): cursor = min(cursor + 1, len(selectable) - 1) elif key in ("k", "up"): cursor = max(cursor - 1, 0) elif key == LINE_PICKER_SELECT_KEY: return selectable[cursor].target def render_line_picker( # noqa: PLR0913 self, state: ReviewState, selectable: list[DisplayLine], cursor: int, delta_output: bytes, scroll_offset: int, *, highlight_escape: bytes = REVERSE.encode(), ) -> int: """Render full-screen line picker with delta formatting.""" hunk = state.hunks[state.current_index] selected_target = selectable[cursor].target margin_markers, highlight_idx = build_margin_markers(hunk, selected_target) diff_lines = build_display_lines( delta_output, self.width, margin_markers, highlight_index=highlight_idx, highlight_escape=highlight_escape, ) # Ensure cursor line is visible cursor_idx = next( (i for i, dl in enumerate(hunk.display_lines) if dl is selectable[cursor]), None, ) if cursor_idx is not None: visible = self.height - CHROME_ROWS - SCROLL_INDICATOR_ROWS if cursor_idx < scroll_offset: scroll_offset = cursor_idx elif cursor_idx >= scroll_offset + visible: scroll_offset = cursor_idx - visible + 1 viewport = compute_diff_viewport( len(diff_lines), self.height, scroll_offset, ) self.write(self.CLEAR_SCREEN) top_bar = build_top_bar( hunk, state.current_index, state.hunks, state.global_notes, self.width, ) self.write(chrome_line(top_bar, self.width) + "\r\n") markers = build_progress_markers(state.hunks, state.current_index, self.width) self.write(chrome_line(markers, self.width) + "\r\n") self.render_diff_body(diff_lines, viewport) picker_footer = build_keyhint_footer(LINE_PICKER_FOOTER_SEGMENTS, self.width) self.render_footer(viewport, picker_footer) return viewport.scroll_offset def handle_global_note(self, state: ReviewState, key: str) -> None: """Handle ``gc`` (global question) or ``gf`` (global flag).""" kind = NoteKind.FLAG if key == "f" else NoteKind.QUESTION text = self.edit_text_outside_tui("", kind.global_editor_label) if text: state.global_notes.append(GlobalNote(kind=kind, text=text)) def handle_scroll( self, key: str, state: ReviewState, redraw: Callable[[], None], ) -> bool: """Handle scroll keys (Ctrl-D/Ctrl-U), returning True if handled.""" half = max(1, (self.height - CHROME_ROWS - SCROLL_INDICATOR_ROWS) // 2) if key == self.KEY_CTRL_D: state.scroll_offset += half elif key == self.KEY_CTRL_U: state.scroll_offset = max(0, state.scroll_offset - half) else: return False redraw() return True def dispatch_key( self, key: str, state: ReviewState, redraw: Callable[[], None], ) -> bool: """Handle a single keypress, returning True if a full redraw is needed.""" if self.handle_scroll(key, state, redraw): return False if state.navigate(key): state.scroll_offset = 0 return True if key == "a": state.approve() elif key == "A": state.approve_file() elif key in ("c", "f"): self.handle_hunk_comment(state, key) elif key == "g": key2 = self.read_key() if key2 not in ("c", "f"): return False self.handle_global_note(state, key2) elif key == "m": self.handle_manage_notes(state) elif key == "?": self.render_help_screen() self.read_key() else: return False return True def run_review_loop( self, state: ReviewState, delta_cache: dict[int, bytes], ) -> None: """Run the interactive key-handling loop until the user quits.""" with nonblocking_pipe() as (wakeup_r, wakeup_w): def get_delta(index: int) -> bytes: """Return cached delta output for hunk at *index*.""" if index not in delta_cache: delta_cache[index] = render_through_delta( state.hunks[index].raw, width=self.width, ) return delta_cache[index] def redraw() -> None: """Redraw the review screen.""" state.scroll_offset = self.render_review_screen( state.hunks, state.current_index, get_delta(state.current_index), state.global_notes, state.scroll_offset, ) prev_wakeup_fd = signal.set_wakeup_fd(wakeup_w, warn_on_full_buffer=False) prev_handler = signal.getsignal(signal.SIGWINCH) signal.signal(signal.SIGWINCH, lambda *_: None) self.wakeup_read_fd = wakeup_r try: tty.setraw(self.fd) redraw() while True: key = self.read_key(wakeup_read_fd=wakeup_r) if key == self.KEY_RESIZE: self.apply_resize(delta_cache) redraw() continue if key in ("q", self.KEY_CTRL_C): break handled = self.dispatch_key(key, state, redraw) if handled: redraw() finally: self.wakeup_read_fd = None signal.signal(signal.SIGWINCH, prev_handler) signal.set_wakeup_fd(prev_wakeup_fd) def build_arg_parser() -> argparse.ArgumentParser: """Build and return the argument parser for neorev.""" parser = argparse.ArgumentParser( prog="neorev", description=__doc__, epilog="When no diff is piped and no -g/-j revision is given, " "auto-detects the VCS backend and defaults to " f"'jj show {JJ_DEFAULT_REV}' or 'git show {GIT_DEFAULT_REV}'.", ) parser.add_argument( "-c", "--clear", action="store_true", help="discard previous review and start fresh", ) rev_group = parser.add_mutually_exclusive_group() rev_group.add_argument( "-g", "--git", nargs="?", const="", default=None, metavar="REV", help="run 'git show [REV]' and use its output as the diff", ) rev_group.add_argument( "-j", "--jj", nargs="?", const="", default=None, metavar="REV", help="run 'jj show [REV]' and use its output as the diff", ) parser.add_argument( "-o", "--output", default=None, help="output file for the review (created or updated); " "defaults to $XDG_STATE_HOME/neorev//.md", ) parser.add_argument( "-x", "--clip", action="store_true", help="copy @output_file to clipboard (via xclip) on exit", ) return parser @dataclass class VcsMetadata: """Project metadata extracted from the VCS for default output naming.""" dirname: str workspace: str = "" rev: str = "" def detect_vcs() -> Vcs | None: """Detect the VCS backend for the current directory.""" if Path(".jj").is_dir(): return Vcs.JUJUTSU if Path(".git").is_dir(): return Vcs.GIT return None def run_vcs(command: list[str]) -> str: """Run a VCS command and return stripped stdout.""" result = subprocess.run( command, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, check=True, ) return result.stdout.strip() def query_jj_metadata(rev: str = JJ_DEFAULT_REV) -> VcsMetadata: """Return project metadata from the current jujutsu repo.""" change_id = run_vcs( [ "jj", "log", "--no-graph", "--ignore-working-copy", "-r", rev, "-T", 'self.change_id().shortest() ++ "\\n"', ] ) workspace = "" ws_output = run_vcs(["jj", "workspace", "list", "--ignore-working-copy"]) first_line = ws_output.split("\n", 1)[0] ws_name = first_line.split(":")[0].strip() if ws_name and ws_name != JJ_DEFAULT_WORKSPACE: workspace = ws_name return VcsMetadata(dirname=Path.cwd().name, workspace=workspace, rev=change_id) def query_git_metadata(rev: str = GIT_DEFAULT_REV) -> VcsMetadata: """Return project metadata from the current git repo.""" short_hash = run_vcs(["git", "rev-parse", "--short", rev]) workspace = "" toplevel = Path(run_vcs(["git", "rev-parse", "--show-toplevel"])) porcelain = run_vcs(["git", "worktree", "list", "--porcelain"]) main_dir = "" for line in porcelain.splitlines(): if line.startswith("worktree "): main_dir = line.split(" ", 1)[1] break if main_dir and Path(main_dir) != toplevel: workspace = toplevel.name return VcsMetadata(dirname=Path.cwd().name, workspace=workspace, rev=short_hash) def query_vcs_metadata(vcs: Vcs, rev: str = "") -> VcsMetadata: """Return project metadata for the current project.""" try: if vcs == Vcs.JUJUTSU: return query_jj_metadata(rev) if rev else query_jj_metadata() if vcs == Vcs.GIT: return query_git_metadata(rev) if rev else query_git_metadata() except (FileNotFoundError, subprocess.CalledProcessError): pass return VcsMetadata(dirname=Path.cwd().name) def compact_path(path: str) -> str: """Replace the user's home directory prefix with ``~``.""" home = str(Path.home()) if path.startswith(home + os.sep) or path == home: return "~" + path[len(home) :] return path def default_output_path(vcs: Vcs, rev: str = "") -> str: """Build the default output path under the XDG state directory.""" xdg_state = os.environ.get( "XDG_STATE_HOME", str(Path.home() / XDG_STATE_HOME_DEFAULT), ) meta = query_vcs_metadata(vcs, rev) state_dir = Path(xdg_state) / NEOREV_STATE_DIR / meta.dirname state_dir.mkdir(parents=True, exist_ok=True) parts = [DEFAULT_OUTPUT_STEM] if meta.workspace: parts.append(meta.workspace) if meta.rev: parts.append(meta.rev) filename = DEFAULT_OUTPUT_SEPARATOR.join(parts) + DEFAULT_OUTPUT_EXT return str(state_dir / filename) def resolve_args(args: argparse.Namespace) -> None: """Fill in *args.output* with the default path when not provided.""" if args.output is not None: return if args.jj is not None: vcs = Vcs.JUJUTSU rev = args.jj elif args.git is not None: vcs = Vcs.GIT rev = args.git else: vcs = detect_vcs() if vcs is None: return rev = "" args.output = default_output_path(vcs, rev) def fetch_diff_from_vcs(command: list[str]) -> str: """Run a VCS command and return its stdout, exiting on failure.""" try: result = subprocess.run(command, capture_output=True, text=True, check=True) except FileNotFoundError: sys.stderr.write(f"Error: '{command[0]}' not found.\n") exit(os.EX_UNAVAILABLE) except subprocess.CalledProcessError as exc: sys.stderr.write(exc.stderr or f"Error: {command} failed.\n") exit(os.EX_DATAERR) if not result.stdout.strip(): sys.stderr.write("Empty diff.\n") exit(os.EX_DATAERR) return result.stdout def read_diff_from_stdin() -> str: """Read diff text from stdin, raising *NoDiffOnStdinError* when stdin is a TTY.""" if sys.stdin.isatty(): raise NoDiffOnStdinError diff_text = sys.stdin.read() if not diff_text.strip(): sys.stderr.write("Empty diff.\n") exit(os.EX_DATAERR) return diff_text def parse_diff_or_exit(diff_text: str) -> list[Hunk]: """Parse *diff_text* into hunks, exiting if none are found.""" hunks = parse_diff(diff_text) if not hunks: sys.stderr.write("No hunks found in diff.\n") exit(os.EX_DATAERR) return hunks def load_or_resume_review(hunks: list[Hunk], output_path: str) -> list[GlobalNote]: """ Load a previous review from *output_path* if it exists. Applies matching annotations and the approved-hunk bitmap to *hunks* in place and returns the list of global notes. """ previous_annotations, previous_global_notes, bitmap_encoded = load_previous_review( output_path, ) if not previous_annotations and not previous_global_notes and not bitmap_encoded: return [] matched = apply_previous_review(hunks, previous_annotations) stale = len(previous_annotations) - matched approved_from_bitmap = 0 if bitmap_encoded: approved = decode_approved_bitmap(bitmap_encoded, len(hunks)) for i, is_approved in enumerate(approved): if is_approved and not hunks[i].notes: hunks[i].approved = True approved_from_bitmap += 1 display_path = compact_path(output_path) sys.stderr.write( f"Loaded {matched} hunk annotation(s), " f"{approved_from_bitmap} approved hunk(s), and " f"{len(previous_global_notes)} global note(s) from {display_path}\n", ) if stale: sys.stderr.write( f" ({stale} previous annotation(s) no longer match any hunk)\n", ) return previous_global_notes def review_is_all_clear(hunks: list[Hunk], global_notes: list[GlobalNote]) -> bool: """Return True when every hunk is approved and there are no global notes.""" return all(h.approved for h in hunks) and not global_notes def write_review_output( output_path: str, hunks: list[Hunk], global_notes: list[GlobalNote], diff_source: str = "", ) -> None: """Write the final review output to *output_path*.""" review_output = format_output(hunks, global_notes, diff_source) Path(output_path).write_text(review_output) sys.stderr.write(f"Review written to {compact_path(output_path)}\n") def copy_output_reference_to_clipboard(output_path: str) -> None: """Copy ``@output_path`` to the system clipboard via ``xclip``.""" clip_text = f"@{compact_path(output_path)}" try: subprocess.run( ["xclip", "-selection", "clipboard"], input=clip_text.encode(), check=True, timeout=3, ) sys.stderr.write(f"Copied '{clip_text}' to clipboard\n") except FileNotFoundError: sys.stderr.write("xclip not found — install it to use --clip\n") except (subprocess.TimeoutExpired, subprocess.CalledProcessError) as exc: sys.stderr.write(f"Clipboard copy failed: {exc}\n") def main() -> None: """Run the neorev command-line entry point.""" parser = build_arg_parser() args = parser.parse_args() resolve_args(args) diff_source = "" if args.git is not None: cmd = ["git", "show"] + ([args.git] if args.git else []) diff_text = fetch_diff_from_vcs(cmd) diff_source = f"git show {args.git}".strip() elif args.jj is not None: cmd = ["jj", "show"] + ([args.jj] if args.jj else []) diff_text = fetch_diff_from_vcs(cmd) diff_source = f"jj show {args.jj}".strip() else: try: diff_text = read_diff_from_stdin() except NoDiffOnStdinError: vcs = detect_vcs() if vcs is None: parser.print_usage(sys.stderr) sys.stderr.write( "\nError: no diff on stdin and no VCS detected." " Pipe a diff or pass a revision with -g/-j.\n", ) exit(os.EX_USAGE) default_rev = JJ_DEFAULT_REV if vcs == Vcs.JUJUTSU else GIT_DEFAULT_REV cmd = [vcs, "show", default_rev] diff_text = fetch_diff_from_vcs(cmd) diff_source = f"{vcs} show {default_rev}" if args.output is None: args.output = default_output_path(vcs) hunks = parse_diff_or_exit(diff_text) if args.clear: Path(args.output).unlink(missing_ok=True) global_notes = load_or_resume_review(hunks, args.output) delta_cache: dict[int, bytes] = {} state = ReviewState( hunks=hunks, global_notes=global_notes, current_index=ReviewState.initial_index(hunks), ) with Terminal() as term: term.write(f"{term.ALT_SCREEN_ON}{term.CURSOR_HIDE}") try: term.run_review_loop(state, delta_cache) finally: term.write(f"{term.CURSOR_SHOW}{term.ALT_SCREEN_OFF}") if review_is_all_clear(hunks, global_notes): sys.stderr.write(ALL_CLEAR_MESSAGE) else: write_review_output(args.output, hunks, global_notes, diff_source) if args.clip: copy_output_reference_to_clipboard(args.output) if __name__ == "__main__": main()