#!/usr/bin/env python3 """ ARIS Research Wiki — Helper utilities. Canonical helper for the /research-wiki skill and integration hooks in other skills. The SKILL.md prose for paper-reading skills (research-lit, arxiv, alphaxiv, deepxiv, semantic-scholar, exa-search) delegates ingest to this script; no skill duplicates the page-creation schema. Usage: python3 research_wiki.py init python3 research_wiki.py slug "" --author "" --year 2025 python3 research_wiki.py add_edge --from --to --type --evidence "" python3 research_wiki.py rebuild_query_pack [--max-chars 8000] python3 research_wiki.py rebuild_index python3 research_wiki.py stats python3 research_wiki.py log "" # Canonical paper ingest (preferred by integration hooks): python3 research_wiki.py ingest_paper --arxiv-id \ [--thesis ""] [--tags tag1,tag2] [--update-on-exist] # Manual ingest when arXiv metadata is not available: python3 research_wiki.py ingest_paper \ --title "" --authors "A, B, C" --year 2025 \ --venue [--external-id-doi ] [--thesis "..."] [--tags ...] # Batch backfill: python3 research_wiki.py sync --arxiv-ids id1,id2,id3 python3 research_wiki.py sync --from-file ids.txt # Claim layer (PROVE/JUDGE output ledger): python3 research_wiki.py add_claim --slug b1-main-ub \ --name "..." --status sound-modulo-imports --provenance \ --statement "..." --scope "..." --evidence "..." \ --addresses G2,G10 --extends paper:slug --uses paper:slug \ --depends-on claim:other --refutes claim:bad """ # `from __future__ import annotations` defers annotation evaluation so that # PEP 604 union syntax (`Path | None`) used below works on Python 3.7+ — # without it the module fails to import on the macOS system default # (`/usr/bin/python3` = 3.9.6), which is a path that many community users # end up on if they have not installed a newer Python via miniforge / brew / # pyenv. The helper is otherwise pure-stdlib. from __future__ import annotations import argparse import json import os import re import sys import time import urllib.request import urllib.error import xml.etree.ElementTree as ET from datetime import datetime, timezone from pathlib import Path # Injection scanner (sibling helper in tools/). Wiki content is re-injected into # agent context (query_pack → /idea-creator; edge evidence summarized for humans), # so scan before persist. Best-effort: if the helper is unavailable, writes proceed # unscanned rather than break — the cross-model jury remains the correctness gate # either way (see shared-references/injection-hygiene.md). Layer 1 of 2. try: from threat_scan import scan_for_threats, quarantine except ImportError: # imported from a different cwd sys.path.insert(0, str(Path(__file__).resolve().parent)) try: from threat_scan import scan_for_threats, quarantine except ImportError: scan_for_threats = None # type: ignore quarantine = None # type: ignore _ARXIV_API = "https://export.arxiv.org/api/query?id_list={ids}" _ARXIV_NS = {"atom": "http://www.w3.org/2005/Atom", "arxiv": "http://arxiv.org/schemas/atom"} def _arxiv_user_agent() -> str: """Descriptive User-Agent for arXiv API calls. arXiv rate-limits the default ``Python-urllib/x.y`` agent far more aggressively than a named client; sending a descriptive UA (with an optional contact address) lands requests in arXiv's more lenient pool. The contact is read from ``ARIS_VERIFY_EMAIL`` — the same env var the /research-lit skill already uses for the CrossRef polite pool — so no address is hard-coded. Falls back to a contactless UA when unset. """ contact = os.environ.get("ARIS_VERIFY_EMAIL", "").strip() base = ("ARIS-research-wiki/1.0 " "(+https://github.com/wanshuiyin/Auto-claude-code-research-in-sleep)") return f"{base} (mailto:{contact})" if contact else base def slugify(title: str, author_last: str = "", year: int = 0) -> str: """Generate a canonical slug: author_last + year + keyword.""" # Extract first meaningful word from title stop_words = {"a", "an", "the", "of", "for", "in", "on", "with", "via", "and", "to", "by"} words = re.sub(r"[^a-z0-9\s]", "", title.lower()).split() keywords = [w for w in words if w not in stop_words and len(w) > 2] keyword = "_".join(keywords[:3]) if keywords else "untitled" author = re.sub(r"[^a-z]", "", author_last.lower()) if author_last else "unknown" yr = str(year) if year else "0000" return f"{author}{yr}_{keyword}" def init_wiki(wiki_root: str): """Initialize wiki directory structure.""" root = Path(wiki_root) dirs = ["papers", "ideas", "experiments", "claims", "graph"] for d in dirs: (root / d).mkdir(parents=True, exist_ok=True) # Create empty files if they don't exist for f in ["index.md", "log.md", "gap_map.md", "query_pack.md"]: path = root / f if not path.exists(): if f == "index.md": path.write_text("# Research Wiki Index\n\n_Auto-generated. Do not edit._\n") elif f == "log.md": path.write_text("# Research Wiki Log\n\n_Append-only timeline._\n") elif f == "gap_map.md": path.write_text("# Gap Map\n\n_Field gaps with stable IDs._\n") elif f == "query_pack.md": path.write_text("# Query Pack\n\n_Auto-generated for /idea-creator. Max 8000 chars._\n") # Create empty edges file edges_path = root / "graph" / "edges.jsonl" if not edges_path.exists(): edges_path.write_text("") append_log(wiki_root, "Wiki initialized") print(f"Research wiki initialized at {root}") def add_edge(wiki_root: str, from_id: str, to_id: str, edge_type: str, evidence: str = ""): """Add a typed edge to the relationship graph.""" VALID_TYPES = { "extends", "contradicts", "addresses_gap", "inspired_by", "tested_by", "supports", "invalidates", "supersedes", # Claim-layer edge types (additive, used by add_claim): # claim --depends_on--> claim (proof-obligation dependency) # claim --refutes--> claim (a claim that falsifies another) # claim --uses--> paper (imports a result/ingredient) "depends_on", "refutes", "uses", } if edge_type not in VALID_TYPES: print(f"Warning: unknown edge type '{edge_type}'. Valid: {VALID_TYPES}", file=sys.stderr) edges_path = Path(wiki_root) / "graph" / "edges.jsonl" # Dedup check existing_edges = [] if edges_path.exists(): for line in edges_path.read_text().strip().split("\n"): if line.strip(): try: existing_edges.append(json.loads(line)) except json.JSONDecodeError: continue # Check if edge already exists for e in existing_edges: if e.get("from") == from_id and e.get("to") == to_id and e.get("type") == edge_type: print(f"Edge already exists: {from_id} --{edge_type}--> {to_id}") return # Quarantine edge evidence (model/web-authored, re-read into context): # neutralize an injection payload but keep the edge structure intact. safe_evidence = evidence if quarantine is not None and evidence: safe_evidence, findings = quarantine( evidence, scope="strict", label=f"edge {from_id} -> {to_id}") if findings: # Fail-closed WITH visibility: the graph gets the placeholder; the # raw flagged text + findings go to a reviewable quarantine log so a # human can inspect it. Nothing is silently dropped. qlog = Path(wiki_root) / "graph" / "quarantine.log" with open(qlog, "a", encoding="utf-8") as f: f.write(json.dumps({ "ts": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "edge": f"{from_id} --{edge_type}--> {to_id}", "findings": findings, "raw_evidence": evidence, }, ensure_ascii=False) + "\n") print(f"⚠️ edge evidence quarantined (threat pattern: " f"{', '.join(findings)}); placeholder in graph, raw text " f"preserved in graph/quarantine.log for review.", file=sys.stderr) edge = { "from": from_id, "to": to_id, "type": edge_type, "evidence": safe_evidence, "added": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), } with open(edges_path, "a") as f: f.write(json.dumps(edge, ensure_ascii=False) + "\n") print(f"Edge added: {from_id} --{edge_type}--> {to_id}") def rebuild_query_pack(wiki_root: str, max_chars: int = 8000): """Generate a compressed query_pack.md for /idea-creator.""" root = Path(wiki_root) sections = [] # 1. Project direction — structured extraction from RESEARCH_BRIEF.md # Parses the template-defined ## sections to preserve key fields # (problem, constraints, direction) that flat truncation would miss. # Deterministic, no LLM. brief_path = root.parent / "RESEARCH_BRIEF.md" if brief_path.exists(): raw = brief_path.read_text() # Parse ## sections from the brief sections_map: dict[str, str] = {} current_heading = "" current_lines: list[str] = [] for line in raw.split("\n"): if line.startswith("## "): if current_heading: sections_map[current_heading] = "\n".join(current_lines).strip() current_heading = line[3:].strip() current_lines = [] elif current_heading: current_lines.append(line) if current_heading: sections_map[current_heading] = "\n".join(current_lines).strip() def _section(name: str) -> str | None: # Exact match first, then a tolerant match so template drift # ("Existing Results" vs "Existing Results (if any)", trailing # punctuation, case) still resolves to the intended section. text = sections_map.get(name, "").strip() if not text: want = name.lower().rstrip(":").strip() for k, v in sections_map.items(): kk = k.lower().rstrip(":").strip() if kk == want or kk.startswith(want) or want.startswith(kk): text = v.strip() if text: break return text if text else None # Priority order for /idea-creator: problem → constraints → direction → background parts: list[str] = [] for label, heading in [ ("Problem", "Problem Statement"), ("Constraints", "Constraints"), ("Direction", "What I'm Looking For"), ("Background", "Background"), ("Non-goals", "Non-Goals"), ("Domain Knowledge", "Domain Knowledge"), ("Existing Results", "Existing Results (if any)"), ]: text = _section(heading) if text: parts.append(f"**{label}**\n\n{text}") if parts: brief = "\n\n".join(parts) sections.append(f"## Project Direction\n{brief}\n") else: # Fallback: the brief uses none of the template's known headings # (custom template, or a free-form brief). Don't silently drop the # whole brief — fall back to the original flat-slice behavior so # /idea-creator still gets *some* project context. flat = raw.strip()[:600] if flat: sections.append(f"## Project Direction\n{flat}\n") # 2. Gap map (1200 chars) gap_path = root / "gap_map.md" if gap_path.exists(): gaps = gap_path.read_text()[:1200] if gaps.strip() and gaps.strip() != "# Gap Map\n\n_Field gaps with stable IDs._": sections.append(f"## Open Gaps\n{gaps}\n") # 3. Failed ideas (1400 chars) — highest anti-repetition value ideas_dir = root / "ideas" if ideas_dir.exists(): failed = [] for f in sorted(ideas_dir.glob("*.md")): meta = _load_paper_frontmatter(f) # Gate on the FRONTMATTER outcome only (not a full-text substring): a # `pending` idea whose body discusses an "outcome: negative" failure mode # must NOT be banlisted. if meta.get("outcome") in ("negative", "mixed"): content = f.read_text() lines = content.split("\n") title = meta.get("title", "") failure = "" for line in lines: if "failure" in line.lower() or "lesson" in line.lower(): idx = lines.index(line) failure = "\n".join(lines[idx:idx+3]) if title: failed.append(f"- **{title}**: {failure[:200]}") if failed: failed_text = "\n".join(failed)[:1400] sections.append(f"## Failed Ideas (avoid repeating)\n{failed_text}\n") # 4. Paper summaries (1800 chars) — top by relevance papers_dir = root / "papers" if papers_dir.exists(): paper_summaries = [] for f in sorted(papers_dir.glob("*.md")): content = f.read_text() # Extract one-line thesis and key fields node_id = "" title = "" thesis = "" for line in content.split("\n"): if line.startswith("node_id:"): node_id = line.split(":", 1)[1].strip() if line.startswith("title:"): title = line.split(":", 1)[1].strip().strip('"') if line.startswith("# One-line thesis"): idx = content.split("\n").index(line) next_lines = content.split("\n")[idx+1:idx+3] thesis = " ".join(l for l in next_lines if l.strip() and not l.startswith("#")) if title: suffix = f": {thesis[:150]}" if thesis.strip() else "" paper_summaries.append(f"- [{node_id}] {title}{suffix}") if paper_summaries: papers_text = "\n".join(paper_summaries[:12])[:1800] sections.append(f"## Key Papers ({len(paper_summaries)} total)\n{papers_text}\n") # 5. Active relationship chains (900 chars) edges_path = root / "graph" / "edges.jsonl" if edges_path.exists(): edges = [] for line in edges_path.read_text().strip().split("\n"): if line.strip(): try: edges.append(json.loads(line)) except json.JSONDecodeError: continue if edges: chains = [] for e in edges[-20:]: # recent edges chains.append(f" {e['from']} --{e['type']}--> {e['to']}") chains_text = "\n".join(chains)[:900] sections.append(f"## Recent Relationships ({len(edges)} total)\n{chains_text}\n") # Assemble pack = "# Research Wiki Query Pack\n\n_Auto-generated. Do not edit._\n\n" for s in sections: if len(pack) + len(s) <= max_chars: pack += s else: remaining = max_chars - len(pack) - 20 if remaining > 100: chunk = s[:remaining] # Snap to last line break to avoid mid-sentence cut last_nl = chunk.rfind("\n") if last_nl > remaining // 2: chunk = chunk[:last_nl] pack += chunk + "\n...(truncated)\n" break # The query_pack is injected verbatim into /idea-creator. Scan it (don't # blank it — it's assembled from many nodes) and, if a node carried an # injection payload, prepend a visible banner so the consumer treats any # embedded directive as DATA, not instructions, and fixes the source node. if scan_for_threats is not None: findings = scan_for_threats(pack, scope="strict") if findings: print(f"⚠️ query_pack flagged (threat pattern: {', '.join(findings)}) " f"— a wiki node carries an injection-like payload; review nodes.", file=sys.stderr) pack = ( f"\n\n" + pack ) pack_path = root / "query_pack.md" pack_path.write_text(pack) print(f"query_pack.md rebuilt: {len(pack)} chars") def get_stats(wiki_root: str): """Print wiki statistics.""" root = Path(wiki_root) def count_files(subdir): d = root / subdir return len(list(d.glob("*.md"))) if d.exists() else 0 def count_by_field(subdir, field, value): d = root / subdir if not d.exists(): return 0 count = 0 for f in d.glob("*.md"): # Read the FRONTMATTER field only — not a full-text substring, so body # text mentioning e.g. "outcome: negative" can't inflate the count. if _load_paper_frontmatter(f).get(field) == value: count += 1 return count papers = count_files("papers") ideas = count_files("ideas") experiments = count_files("experiments") claims = count_files("claims") edges_path = root / "graph" / "edges.jsonl" edge_count = 0 if edges_path.exists(): edge_count = sum(1 for line in edges_path.read_text().strip().split("\n") if line.strip()) print(f"📚 Research Wiki Stats") print(f"Papers: {papers}") print(f"Ideas: {ideas} ({count_by_field('ideas', 'outcome', 'negative')} failed, " f"{count_by_field('ideas', 'outcome', 'positive')} succeeded)") print(f"Experiments: {experiments}") _claim_parts = [] for _st in sorted(_CLAIM_STATUSES): _n = count_by_field('claims', 'status', _st) if _n: _claim_parts.append(f"{_n} {_st}") print(f"Claims: {claims}" + (f" ({', '.join(_claim_parts)})" if _claim_parts else "")) print(f"Edges: {edge_count}") print(f"Wiki root: {root}") def _normalize_arxiv_id(arxiv_id: str) -> str: """Strip common prefixes and version suffix from arxiv id. Preserves legacy category-prefixed IDs: `cs/0601001`, `cs.LG/0703124` stay as-is (minus any trailing vN); modern IDs like `2501.12345v2` become `2501.12345`. The arXiv API accepts both forms via `id_list=`. """ s = arxiv_id.strip() for prefix in ("arXiv:", "arxiv:", "http://arxiv.org/abs/", "https://arxiv.org/abs/"): if s.lower().startswith(prefix.lower()): s = s[len(prefix):] # Never split on '/' — legacy IDs are `category/NNNNNNN`. s = re.sub(r"v\d+$", "", s) return s def _yaml_quote(s: str) -> str: """YAML double-quoted string escape: backslash and double-quote. Frontmatter values containing a literal `"` (e.g. titles like `Foo "Bar" Baz`) would otherwise corrupt the page. Tabs and newlines inside metadata fields are also normalized. """ if s is None: return '""' s = str(s).replace("\r", "").replace("\t", " ") s = s.replace("\\", "\\\\").replace('"', '\\"').replace("\n", " ") return f'"{s}"' def _arxiv_api_get(url: str, what: str, timeout: float = 15.0) -> bytes: """GET an arXiv API URL with a descriptive User-Agent + retry/backoff. Centralizes the rate-limit handling shared by the single and batch fetchers: sends ``_arxiv_user_agent()`` (lands in arXiv's lenient pool), retries up to 3 times on HTTP 429, transient network errors, and the plain-text "Rate exceeded." body the API sometimes returns with 200 OK. ``what`` is a label for error messages (e.g. the id or id-list). """ req = urllib.request.Request(url, headers={"User-Agent": _arxiv_user_agent()}) for attempt in (1, 2, 3): try: with urllib.request.urlopen(req, timeout=timeout) as resp: body = resp.read() except urllib.error.HTTPError as e: if e.code == 429 and attempt < 3: time.sleep(5 * attempt) continue raise RuntimeError(f"arXiv API fetch failed for {what}: {e}") except (urllib.error.URLError, TimeoutError, OSError) as e: if attempt < 3: time.sleep(2 * attempt) continue raise RuntimeError(f"arXiv API fetch failed for {what}: {e}") if body.strip() == b"Rate exceeded.": if attempt < 3: time.sleep(5 * attempt) continue raise RuntimeError(f"arXiv API rate-limited for {what} after 3 attempts") return body return b"" # unreachable; loop either returns or raises def _parse_arxiv_entry(entry) -> dict: """Parse one Atom element into the metadata dict shape.""" def _txt(el, default=""): return el.text.strip() if el is not None and el.text else default title = re.sub(r"\s+", " ", _txt(entry.find("atom:title", _ARXIV_NS))) summary = re.sub(r"\s+", " ", _txt(entry.find("atom:summary", _ARXIV_NS))) published = _txt(entry.find("atom:published", _ARXIV_NS)) year = int(published[:4]) if published[:4].isdigit() else 0 authors = [] for a in entry.findall("atom:author", _ARXIV_NS): n = _txt(a.find("atom:name", _ARXIV_NS)) if n: authors.append(n) primary = entry.find("arxiv:primary_category", _ARXIV_NS) primary_cat = primary.get("term") if primary is not None else "" journal_ref = _txt(entry.find("arxiv:journal_ref", _ARXIV_NS)) venue = journal_ref if journal_ref else "arXiv" # The element holds e.g. http://arxiv.org/abs/2510.23672v1 — recover # the bare, version-stripped id so batch results can be keyed back to the # ids the caller asked for. raw_id = _txt(entry.find("atom:id", _ARXIV_NS)) aid = _normalize_arxiv_id(raw_id.rsplit("/abs/", 1)[-1]) if raw_id else "" return { "arxiv_id": aid, "title": title, "authors": authors, "year": year, "venue": venue, "abstract": summary, "primary_category": primary_cat, } def fetch_arxiv_metadata(arxiv_id: str, timeout: float = 15.0) -> dict: """Query arXiv Atom API for one paper. Returns a metadata dict. Sends a descriptive User-Agent and retries up to 3 times on arXiv rate limits (HTTP 429 or the plain-text "Rate exceeded." body) and transient network errors. Raises RuntimeError when all retries are exhausted — callers decide whether to abort the ingest or fall back to manual metadata. """ aid = _normalize_arxiv_id(arxiv_id) body = _arxiv_api_get(_ARXIV_API.format(ids=aid), aid, timeout=timeout) try: root = ET.fromstring(body) except ET.ParseError as e: raise RuntimeError(f"arXiv API returned unparseable XML for {aid}: {e}") entry = root.find("atom:entry", _ARXIV_NS) if entry is None: raise RuntimeError(f"arXiv API returned no entry for {aid}") meta = _parse_arxiv_entry(entry) # The single-id query is authoritative for the id even if parsing # came up empty (e.g. malformed feed); keep the caller's normalized id. meta["arxiv_id"] = aid return meta def fetch_arxiv_metadata_batch(arxiv_ids: list[str], timeout: float = 30.0) -> dict: """Fetch metadata for many papers in ONE arXiv request via id_list. arXiv's ``id_list`` parameter accepts a comma-separated list and returns all entries in a single Atom feed, so N papers cost 1 request instead of N — the structural fix for the burst-429 problem when ingesting a batch. Returns ``{normalized_id: meta}``; ids the API did not return are simply absent from the dict (caller decides how to handle misses). """ norm = [_normalize_arxiv_id(a.strip()) for a in arxiv_ids if a and a.strip()] if not norm: return {} # arXiv defaults max_results to 10, so an id_list of >10 silently returns # only the first 10 entries — set max_results to the full count so all # requested papers come back in the single request. url = _ARXIV_API.format(ids=",".join(norm)) + f"&max_results={len(norm)}" body = _arxiv_api_get(url, f"id_list[{len(norm)}]", timeout=timeout) try: root = ET.fromstring(body) except ET.ParseError as e: raise RuntimeError(f"arXiv API returned unparseable XML for batch: {e}") out: dict = {} for entry in root.findall("atom:entry", _ARXIV_NS): meta = _parse_arxiv_entry(entry) if meta.get("arxiv_id"): out[meta["arxiv_id"]] = meta return out def _last_name(full_name: str) -> str: """Crude last-name extraction for slug generation.""" parts = full_name.strip().split() return parts[-1] if parts else "" def _load_paper_frontmatter(path: Path) -> dict: """Parse the YAML-ish frontmatter of a wiki paper page. Returns {} on failure.""" if not path.exists(): return {} text = path.read_text() m = re.match(r"^---\n(.*?)\n---", text, re.DOTALL) if not m: return {} meta = {} for line in m.group(1).split("\n"): if ":" not in line: continue key, _, value = line.partition(":") meta[key.strip()] = value.strip().strip('"').strip("'") return meta def _find_existing_page_by_arxiv(wiki_root: Path, arxiv_id: str) -> Path | None: papers = wiki_root / "papers" if not papers.exists(): return None for p in papers.glob("*.md"): text = p.read_text() # Match either the frontmatter line or a URL reference if re.search(r'arxiv:\s*["\']?' + re.escape(arxiv_id) + r'["\']?', text): return p if re.search(r"arxiv\.org/abs/" + re.escape(arxiv_id), text): return p return None def _render_paper_page(meta: dict, slug: str, thesis: str, tags: list[str]) -> str: """Render the markdown paper page following research-wiki SKILL.md schema.""" fm = { "type": "paper", "node_id": f"paper:{slug}", "title": meta.get("title", ""), "authors": meta.get("authors", []), "year": meta.get("year", 0), "venue": meta.get("venue", "arXiv"), "tags": tags, } external_ids = { "arxiv": meta.get("arxiv_id", ""), "doi": meta.get("doi", ""), "s2": meta.get("s2_id", ""), } lines = ["---"] lines.append(f"type: {fm['type']}") lines.append(f"node_id: {fm['node_id']}") lines.append(f"title: {_yaml_quote(fm['title'])}") lines.append("authors: [" + ", ".join(_yaml_quote(a) for a in fm["authors"]) + "]") lines.append(f"year: {fm['year']}") lines.append(f"venue: {_yaml_quote(fm['venue'])}") lines.append("external_ids:") for k, v in external_ids.items(): value_str = _yaml_quote(v) if v else "null" lines.append(f" {k}: {value_str}") lines.append("tags: [" + ", ".join(_yaml_quote(t) for t in tags) + "]") lines.append(f"added: {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')}") lines.append("---") lines.append("") lines.append(f"# {fm['title']}") lines.append("") lines.append("## One-line thesis") lines.append(thesis or "_TODO: fill in after reading._") lines.append("") lines.append("## Problem / Gap") lines.append("_TODO._") lines.append("") lines.append("## Method") lines.append("_TODO._") lines.append("") lines.append("## Key Results") lines.append("_TODO._") lines.append("") lines.append("## Assumptions") lines.append("_TODO._") lines.append("") lines.append("## Limitations / Failure Modes") lines.append("_TODO._") lines.append("") lines.append("## Reusable Ingredients") lines.append("_TODO._") lines.append("") lines.append("## Open Questions") lines.append("_TODO._") lines.append("") lines.append("## Claims") lines.append("_TODO._") lines.append("") lines.append("## Connections") lines.append("_Edges are recorded in `graph/edges.jsonl`; summarize here for human readers._") lines.append("") lines.append("## Relevance to This Project") lines.append("_TODO._") lines.append("") if meta.get("abstract"): lines.append("## Abstract (original)") lines.append("") lines.append("> " + meta["abstract"]) lines.append("") return "\n".join(lines) + "\n" def ingest_paper(wiki_root: str, *, arxiv_id: str = "", title: str = "", authors: list[str] | None = None, year: int = 0, venue: str = "", doi: str = "", thesis: str = "", tags: list[str] | None = None, update_on_exist: bool = False, prefetched_meta: dict | None = None) -> Path: """Canonical paper-ingest entrypoint. Preferred: pass --arxiv-id and let the helper fetch metadata. If the arXiv lookup fails (offline, unknown id), callers may supply title/authors/year/venue manually; doi is optional. Always: - slugs the title (author + year + keyword) - dedups by arxiv_id first, then by slug — `update_on_exist=False` skips rewriting an existing page - creates papers/.md with the schema from research-wiki SKILL.md - rebuilds index.md and query_pack.md - appends to log.md """ root = Path(wiki_root) if not (root / "papers").exists(): raise RuntimeError(f"{root} is not an initialized wiki (papers/ missing). " f"Run `init` first.") tags = tags or [] authors = authors or [] meta: dict = {} existing: Path | None = None # populated when we find a prior page (by arxiv or slug) if arxiv_id: aid = _normalize_arxiv_id(arxiv_id) existing = _find_existing_page_by_arxiv(root, aid) if existing and not update_on_exist: # Contract §3: every activation leaves a receipt. Log the skip # so a repeated hook invocation is still observable. append_log(str(root), f"ingest_paper: skipped existing paper " f"{existing.name} (arxiv:{aid})") print(f"Paper already ingested: {existing.name} (arxiv:{aid}) — skipping.") return existing if prefetched_meta is not None: # Batch path (sync): metadata already fetched in one id_list call; # skip the per-id network round-trip entirely. meta = dict(prefetched_meta) meta.setdefault("arxiv_id", aid) else: try: meta = fetch_arxiv_metadata(aid) except RuntimeError as e: if title: # caller provided manual fallback print(f"Warning: {e} — falling back to manual metadata.", file=sys.stderr) meta = {"arxiv_id": aid} else: raise # Manual overrides on top of fetched metadata if title: meta["title"] = title if authors: meta["authors"] = authors if year: meta["year"] = year if venue: meta["venue"] = venue else: if not (title and authors and year): raise RuntimeError("Manual ingest requires --title, --authors, and --year " "when --arxiv-id is not supplied.") meta = { "arxiv_id": "", "title": title, "authors": authors, "year": year, "venue": venue or "unknown", } if doi: meta["doi"] = doi author_last = _last_name(meta["authors"][0]) if meta.get("authors") else "" slug = slugify(meta["title"], author_last, meta.get("year", 0)) # If we already found a prior page by arXiv-id dedup, reuse its path and # slug even if the newly-computed slug differs (e.g., title metadata # fluctuated between runs). Otherwise check slug-based dedup. if existing: page_path = existing slug = existing.stem was_update = True else: page_path = root / "papers" / f"{slug}.md" if page_path.exists(): if not update_on_exist: append_log(str(root), f"ingest_paper: skipped existing paper " f"{page_path.name} (slug dedup)") print(f"Paper already ingested: {page_path.name} (slug dedup) — skipping.") return page_path was_update = True else: was_update = False rendered = _render_paper_page(meta, slug, thesis, tags) page_path.write_text(rendered) # Rebuild derived artifacts rebuild_index(str(root)) rebuild_query_pack(str(root)) action = "updated" if was_update else "ingested" append_log(str(root), f"ingest_paper: {action} paper:{slug} " f"(arxiv:{meta.get('arxiv_id','-')})") print(f"Paper {action}: {page_path}") return page_path _CLAIM_STATUSES = { "drafted", # written, not yet adversarially reviewed "unproven", # audited; proof has an open gap (not closed, not falsified) "sound-modulo-imports", # proof closes modulo flagged [unverified-axiom] imports "verified", # passed cross-model acquittal, imports discharged "refuted", # a counterexample / jury falsified it "retracted", # withdrawn (e.g. superseded by a corrected claim) } def _claim_slugify(name: str, slug: str = "") -> str: """Slug for a claim page. A claim usually already carries a stable human ID (e.g. ``b1-main-ub``); if the caller passes ``--slug`` we honor it verbatim (lower-cased, sanitized) so cross-references in proofs stay stable. Otherwise we fall back to the paper slugifier's keyword extraction on the claim name. """ if slug: s = re.sub(r"[^a-z0-9._-]+", "-", slug.strip().lower()).strip("-") if s: return s # No explicit slug: reuse the title keyword extractor (no author/year). return slugify(name).lstrip("_").lstrip("0").strip("_") or "claim" def _render_claim_page(slug: str, name: str, description: str, status: str, provenance: str, statement: str, scope: str, evidence: str, tags: list[str]) -> str: """Render a claims/.md page following the research-wiki schema. Mirrors ``_render_paper_page``: ``---`` frontmatter block then body sections. ``node_type: claim`` distinguishes the node kind; ``status`` is one of ``_CLAIM_STATUSES``; ``provenance`` points at the PROVE/JUDGE run directory that produced the claim (the honesty receipt). """ lines = ["---"] lines.append("type: claim") lines.append(f"node_id: claim:{slug}") lines.append(f"name: {_yaml_quote(name)}") lines.append(f"description: {_yaml_quote(description)}") lines.append("node_type: claim") lines.append(f"status: {status}") lines.append(f"provenance: {_yaml_quote(provenance)}") lines.append("tags: [" + ", ".join(_yaml_quote(t) for t in tags) + "]") lines.append(f"date: {datetime.now(timezone.utc).strftime('%Y-%m-%d')}") lines.append(f"added: {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')}") lines.append("---") lines.append("") lines.append(f"# {name}") lines.append("") lines.append(f"**status:** `{status}`") lines.append("") lines.append("## Statement") lines.append(statement.strip() if statement.strip() else "_TODO: formal statement._") lines.append("") lines.append("## Honest scope") lines.append(scope.strip() if scope.strip() else "_TODO: what this claim does NOT say; banned wordings; flagged imports._") lines.append("") lines.append("## Evidence chain") lines.append(evidence.strip() if evidence.strip() else "_TODO: proof obligations, jury verdicts, provenance pointers._") lines.append("") lines.append("## Connections") lines.append("_Edges are recorded in `graph/edges.jsonl`; summarize here for human readers._") lines.append("") return "\n".join(lines) + "\n" def add_claim(wiki_root: str, slug: str, name: str, *, description: str = "", status: str = "drafted", provenance: str = "", statement: str = "", scope: str = "", evidence: str = "", tags: list[str] | None = None, addresses: list[str] | None = None, extends: list[str] | None = None, uses: list[str] | None = None, depends_on: list[str] | None = None, refutes: list[str] | None = None, update_on_exist: bool = False) -> Path: """Create (or update) a claims/.md node and wire its edges. The claim layer is the PROVE/JUDGE output ledger: every theorem/headline becomes a node with an HONEST ``status`` (one of ``_CLAIM_STATUSES``) and a ``provenance`` pointer to the run directory. Mirrors ``ingest_paper``: - writes claims/.md from the schema above - dedups by slug (``update_on_exist=False`` skips an existing page) - records typed edges into graph/edges.jsonl via ``add_edge``: claim --addresses_gap--> gap (--addresses G2,G10) claim --extends--> paper (--extends paper:slug) claim --uses--> paper (--uses paper:slug) claim --depends_on--> claim (--depends-on claim:slug) claim --refutes--> claim (--refutes claim:slug) - rebuilds index.md + query_pack.md, appends to log.md Bare gap ids (``G2``) and bare claim/paper slugs are auto-prefixed to the ``gap:`` / ``claim:`` / ``paper:`` node_id namespace used by the graph. """ root = Path(wiki_root) if not (root / "claims").exists(): raise RuntimeError(f"{root} is not an initialized wiki (claims/ missing). " f"Run `init` first.") if status not in _CLAIM_STATUSES: raise RuntimeError(f"unknown claim status '{status}'. " f"Valid: {sorted(_CLAIM_STATUSES)}") tags = tags or [] slug = _claim_slugify(name, slug) node_id = f"claim:{slug}" page_path = root / "claims" / f"{slug}.md" if page_path.exists() and not update_on_exist: append_log(str(root), f"add_claim: skipped existing claim " f"{page_path.name} (slug dedup)") print(f"Claim already exists: {page_path.name} (slug dedup) — skipping.") return page_path was_update = page_path.exists() # Quarantine claim body fields before persist (model-authored text that is # re-read into agent context via index/query_pack): same Layer-1 injection # hygiene as edge evidence above. Placeholder persists; raw text goes to # graph/quarantine.log for human review — nothing silently dropped. if quarantine is not None: _q_hits = [] def _q(val, field): if not val: return val safe, findings = quarantine(val, scope="strict", label=f"claim {slug}.{field}") if findings: _q_hits.append((field, findings, val)) return safe description = _q(description, "description") statement = _q(statement, "statement") scope = _q(scope, "scope") evidence = _q(evidence, "evidence") if _q_hits: qlog = root / "graph" / "quarantine.log" qlog.parent.mkdir(parents=True, exist_ok=True) with open(qlog, "a", encoding="utf-8") as f: for field, findings, raw in _q_hits: f.write(json.dumps({ "ts": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "claim": node_id, "field": field, "findings": findings, "raw_text": raw, }, ensure_ascii=False) + "\n") print(f"⚠️ claim field(s) quarantined " f"({', '.join(f for f, _, _ in _q_hits)}); placeholder persisted, " f"raw text preserved in graph/quarantine.log for review.", file=sys.stderr) rendered = _render_claim_page(slug, name, description, status, provenance, statement, scope, evidence, tags) page_path.write_text(rendered) # Wire edges. Reuse add_edge so dedup, evidence quarantine, and the JSONL # format are all identical to paper/idea edges. def _norm(target: str, default_prefix: str) -> str: t = target.strip() if not t: return "" if ":" in t: # already a namespaced node_id (gap:G2 / paper:slug / claim:x) return t # Bare gap ids look like G2, G10; everything else takes default prefix. if default_prefix == "gap:" or re.fullmatch(r"[Gg]\d+", t): return f"gap:{t.upper()}" if re.fullmatch(r"[Gg]\d+", t) else f"{default_prefix}{t}" return f"{default_prefix}{t}" def _warn_if_dangling(nid: str) -> None: # Warn-only: dangling edges are recorded (a [[name]]-style forward # reference is legitimate), but the operator should know. if not nid: return kind, _, rest = nid.partition(":") exists = True if kind == "paper": exists = (root / "papers" / f"{rest}.md").exists() elif kind == "claim": exists = rest == slug or (root / "claims" / f"{rest}.md").exists() elif kind == "gap": gm = root / "gap_map.md" exists = gm.exists() and re.search( rf"\b{re.escape(rest)}\b", gm.read_text(encoding="utf-8")) if not exists: print(f"⚠️ add_claim: edge target {nid} not found in this wiki " f"(dangling edge recorded — create the node or fix the id).", file=sys.stderr) for tgt in (addresses or []): _tid = _norm(tgt, "gap:") _warn_if_dangling(_tid) add_edge(str(root), node_id, _tid, "addresses_gap", evidence=f"claim {slug} addresses gap") for tgt in (extends or []): _tid = _norm(tgt, "paper:") _warn_if_dangling(_tid) add_edge(str(root), node_id, _tid, "extends", evidence=f"claim {slug} extends paper") for tgt in (uses or []): _tid = _norm(tgt, "paper:") _warn_if_dangling(_tid) add_edge(str(root), node_id, _tid, "uses", evidence=f"claim {slug} uses paper") for tgt in (depends_on or []): _tid = _norm(tgt, "claim:") _warn_if_dangling(_tid) add_edge(str(root), node_id, _tid, "depends_on", evidence=f"claim {slug} depends on claim") for tgt in (refutes or []): _tid = _norm(tgt, "claim:") _warn_if_dangling(_tid) add_edge(str(root), node_id, _tid, "refutes", evidence=f"claim {slug} refutes claim") # Rebuild derived artifacts (same as ingest_paper) rebuild_index(str(root)) rebuild_query_pack(str(root)) action = "updated" if was_update else "added" append_log(str(root), f"add_claim: {action} {node_id} [status={status}]" + (f" prov={provenance}" if provenance else "")) print(f"Claim {action}: {page_path} [status={status}]") return page_path _IDEA_OUTCOMES = { "unknown", # not yet assessed "pending", # proposed / awaiting pilot or experiment "negative", # tested, failed (feeds the re-ideation banlist) "mixed", # partial result (also banlisted) "positive", # tested, succeeded } _IDEA_STAGES = {"proposed", "active", "piloted", "archived"} def _idea_slugify(name: str, slug: str = "") -> str: """Slug for an idea page (mirrors _claim_slugify): honor an explicit --slug verbatim (sanitized), else fall back to the title keyword extractor.""" if slug: s = re.sub(r"[^a-z0-9._-]+", "-", slug.strip().lower()).strip("-") if s: return s return slugify(name).lstrip("_").lstrip("0").strip("_") or "idea" def _render_idea_page(slug, title, description, stage, outcome, thesis, risks, based_on_ids, target_gap_ids, tags): """Render an ideas/.md page following the research-wiki schema. Mirrors _render_claim_page. The frontmatter `outcome` field is the one the re-ideation banlist + stats read (rebuild_query_pack / get_stats), so it must be one of _IDEA_OUTCOMES. Edges live in graph/edges.jsonl; the frontmatter based_on / target_gaps lists mirror them for human-readable provenance. """ lines = ["---"] lines.append("type: idea") lines.append(f"node_id: idea:{slug}") lines.append(f"title: {_yaml_quote(title)}") lines.append(f"stage: {stage}") lines.append(f"outcome: {outcome}") lines.append(f"added: {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')}") lines.append("based_on: [" + ", ".join(_yaml_quote(i) for i in based_on_ids) + "]") lines.append("target_gaps: [" + ", ".join(_yaml_quote(i) for i in target_gap_ids) + "]") lines.append("tags: [" + ", ".join(_yaml_quote(t) for t in tags) + "]") lines.append("---") lines.append("") lines.append(f"# {title}") lines.append("") lines.append(f"**stage:** `{stage}` · **outcome:** `{outcome}`") if description.strip(): lines.append("") lines.append(description.strip()) lines.append("") lines.append("## Thesis") lines.append(thesis.strip() if thesis.strip() else "_TODO: the core hypothesis / direction._") lines.append("") lines.append("## Key risks") lines.append(risks.strip() if risks.strip() else "_TODO: novelty / feasibility risks._") lines.append("") lines.append("## Connections") lines.append("_Edges are recorded in `graph/edges.jsonl`; summarize here for human readers._") lines.append("") return "\n".join(lines) + "\n" def upsert_idea(wiki_root: str, slug: str, title: str, *, description: str = "", stage: str = "proposed", outcome: str = "pending", thesis: str = "", risks: str = "", tags: list[str] | None = None, based_on: list[str] | None = None, target_gaps: list[str] | None = None, update_on_exist: bool = False) -> Path: """Create (or update) an ideas/.md node and wire its edges. The idea layer is the research-direction ledger written by /idea-creator after ideation (Phase 7). Mirrors add_claim / ingest_paper: - writes ideas/.md from the schema above - dedups by slug (update_on_exist=False SKIPS an existing page — so a re-ideation run records NEW ideas without clobbering an existing idea whose outcome /result-to-claim may have already enriched) - records typed edges into graph/edges.jsonl via add_edge: idea --inspired_by--> paper (--based-on paper:slug) idea --addresses_gap--> gap (--target-gaps G2,G10) - rebuilds index.md + query_pack.md, appends to log.md `outcome` must be one of _IDEA_OUTCOMES (it drives the re-ideation banlist + stats). NOTE: /result-to-claim updates an idea's outcome by editing the page in place (to preserve the rich body); it must NOT call this with update_on_exist (would clobber). """ root = Path(wiki_root) if not (root / "ideas").exists(): raise RuntimeError(f"{root} is not an initialized wiki (ideas/ missing). " f"Run `init` first.") if outcome not in _IDEA_OUTCOMES: raise RuntimeError(f"unknown idea outcome '{outcome}'. " f"Valid: {sorted(_IDEA_OUTCOMES)}") if stage not in _IDEA_STAGES: raise RuntimeError(f"unknown idea stage '{stage}'. " f"Valid: {sorted(_IDEA_STAGES)}") tags = tags or [] slug = _idea_slugify(title, slug) node_id = f"idea:{slug}" page_path = root / "ideas" / f"{slug}.md" if page_path.exists() and not update_on_exist: append_log(str(root), f"upsert_idea: skipped existing idea " f"{page_path.name} (slug dedup)") print(f"Idea already exists: {page_path.name} (slug dedup) — skipping.") return page_path was_update = page_path.exists() # Quarantine model-authored body fields before persist (re-read into agent # context via index/query_pack) — same Layer-1 injection hygiene as add_claim. # Title is structural (not quarantined, like add_claim's name). Placeholder # persists; raw text → graph/quarantine.log for human review. if quarantine is not None: _q_hits = [] def _q(val, field): if not val: return val safe, findings = quarantine(val, scope="strict", label=f"idea {slug}.{field}") if findings: _q_hits.append((field, findings, val)) return safe description = _q(description, "description") thesis = _q(thesis, "thesis") risks = _q(risks, "risks") if _q_hits: qlog = root / "graph" / "quarantine.log" qlog.parent.mkdir(parents=True, exist_ok=True) with open(qlog, "a", encoding="utf-8") as f: for field, findings, raw in _q_hits: f.write(json.dumps({ "ts": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "idea": node_id, "field": field, "findings": findings, "raw_text": raw, }, ensure_ascii=False) + "\n") print(f"⚠️ idea field(s) quarantined " f"({', '.join(f for f, _, _ in _q_hits)}); placeholder persisted, " f"raw text preserved in graph/quarantine.log for review.", file=sys.stderr) def _norm(target: str, default_prefix: str) -> str: t = target.strip() if not t: return "" if ":" in t: # already a namespaced node_id return t if default_prefix == "gap:" or re.fullmatch(r"[Gg]\d+", t): return f"gap:{t.upper()}" if re.fullmatch(r"[Gg]\d+", t) else f"{default_prefix}{t}" return f"{default_prefix}{t}" def _warn_if_dangling(nid: str) -> None: if not nid: return kind, _, rest = nid.partition(":") exists = True if kind == "paper": exists = (root / "papers" / f"{rest}.md").exists() elif kind == "gap": gm = root / "gap_map.md" exists = gm.exists() and re.search( rf"\b{re.escape(rest)}\b", gm.read_text(encoding="utf-8")) if not exists: print(f"⚠️ upsert_idea: edge target {nid} not found in this wiki " f"(dangling edge recorded — create the node or fix the id).", file=sys.stderr) based_on_ids = [n for n in (_norm(t, "paper:") for t in (based_on or [])) if n] target_gap_ids = [n for n in (_norm(t, "gap:") for t in (target_gaps or [])) if n] rendered = _render_idea_page(slug, title, description, stage, outcome, thesis, risks, based_on_ids, target_gap_ids, tags) page_path.write_text(rendered) # Wire edges (reuse add_edge so dedup + JSONL format match paper/claim edges). for nid in based_on_ids: _warn_if_dangling(nid) add_edge(str(root), node_id, nid, "inspired_by", evidence=f"idea {slug} inspired by paper") for nid in target_gap_ids: _warn_if_dangling(nid) add_edge(str(root), node_id, nid, "addresses_gap", evidence=f"idea {slug} addresses gap") rebuild_index(str(root)) rebuild_query_pack(str(root)) action = "updated" if was_update else "added" append_log(str(root), f"upsert_idea: {action} {node_id} [stage={stage} outcome={outcome}]") print(f"Idea {action}: {page_path} [stage={stage} outcome={outcome}]") return page_path _EXPERIMENT_VERDICTS = {"yes", "partial", "no"} _EXPERIMENT_CONFIDENCE = {"high", "medium", "low"} def _render_experiment_page(slug, title, idea_id, verdict, confidence, date, hardware, duration, metrics, reasoning, provenance, tags): """Render an experiments/.md page (mirrors _render_claim_page). All free-form frontmatter values are _yaml_quote-wrapped (newline-injection safe); verdict/confidence are validated enums; metrics/reasoning live in the body.""" label = title.strip() or f"Experiment {slug}" lines = ["---"] lines.append("type: experiment") lines.append(f"node_id: exp:{slug}") lines.append(f"title: {_yaml_quote(label)}") lines.append(f"idea_id: {_yaml_quote(idea_id)}") lines.append(f"verdict: {verdict}") lines.append(f"confidence: {confidence}") lines.append(f"date: {_yaml_quote(date)}") lines.append(f"hardware: {_yaml_quote(hardware)}") lines.append(f"duration: {_yaml_quote(duration)}") lines.append(f"provenance: {_yaml_quote(provenance)}") lines.append(f"added: {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')}") lines.append("tags: [" + ", ".join(_yaml_quote(t) for t in tags) + "]") lines.append("---") lines.append("") lines.append(f"# {label}") lines.append("") lines.append(f"**verdict:** `{verdict}` · **confidence:** `{confidence}`" + (f" · tests `{idea_id}`" if idea_id else "")) lines.append("") lines.append("## Metrics") lines.append(metrics.strip() if metrics.strip() else "_TODO: key metrics._") lines.append("") lines.append("## Reasoning") lines.append(reasoning.strip() if reasoning.strip() else "_TODO: why this verdict._") lines.append("") lines.append("## Connections") lines.append("_Edges are recorded in `graph/edges.jsonl`; summarize here for human readers._") lines.append("") return "\n".join(lines) + "\n" def add_experiment(wiki_root: str, slug: str, *, title: str = "", idea: str = "", verdict: str = "no", confidence: str = "medium", date: str = "", hardware: str = "", duration: str = "", metrics: str = "", reasoning: str = "", provenance: str = "", tags: list[str] | None = None, update_on_exist: bool = False) -> Path: """Create (or update) an experiments/.md node and wire its edge. The experiment layer is the pilot/run ledger written by /result-to-claim (the verdict owner). Mirrors add_claim / upsert_idea: - writes experiments/.md from the schema - dedups by slug (update_on_exist=False SKIPS; /result-to-claim passes --update-on-exist because re-judging the SAME exp must overwrite the stale verdict, not keep it) - records the idea --tested_by--> exp edge via add_edge (--idea idea:slug) - rebuilds index + query_pack, appends to log `verdict` ∈ _EXPERIMENT_VERDICTS, `confidence` ∈ _EXPERIMENT_CONFIDENCE. Ensures the exp: node EXISTS before /result-to-claim adds supports/invalidates edges FROM it — no dangling evidence-graph edge (edge with no node). """ root = Path(wiki_root) if not (root / "experiments").exists(): raise RuntimeError(f"{root} is not an initialized wiki (experiments/ missing). " f"Run `init` first.") if verdict not in _EXPERIMENT_VERDICTS: raise RuntimeError(f"unknown experiment verdict '{verdict}'. " f"Valid: {sorted(_EXPERIMENT_VERDICTS)}") if confidence not in _EXPERIMENT_CONFIDENCE: raise RuntimeError(f"unknown confidence '{confidence}'. " f"Valid: {sorted(_EXPERIMENT_CONFIDENCE)}") tags = tags or [] slug = re.sub(r"[^a-z0-9._-]+", "-", slug.strip().lower()).strip("-") if not slug: raise RuntimeError("experiment slug (exp id) is required and must be non-empty") node_id = f"exp:{slug}" idea_id = idea.strip() if idea_id and ":" not in idea_id: idea_id = f"idea:{idea_id}" page_path = root / "experiments" / f"{slug}.md" if page_path.exists() and not update_on_exist: append_log(str(root), f"add_experiment: skipped existing experiment " f"{page_path.name} (slug dedup)") print(f"Experiment already exists: {page_path.name} (slug dedup) — skipping.") return page_path was_update = page_path.exists() # Quarantine model-authored body fields (re-read into context) — same hygiene # as add_claim / upsert_idea. Enum + structural fields are not quarantined. if quarantine is not None: _q_hits = [] def _q(val, field): if not val: return val safe, findings = quarantine(val, scope="strict", label=f"experiment {slug}.{field}") if findings: _q_hits.append((field, findings, val)) return safe metrics = _q(metrics, "metrics") reasoning = _q(reasoning, "reasoning") if _q_hits: qlog = root / "graph" / "quarantine.log" qlog.parent.mkdir(parents=True, exist_ok=True) with open(qlog, "a", encoding="utf-8") as f: for field, findings, raw in _q_hits: f.write(json.dumps({ "ts": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"), "experiment": node_id, "field": field, "findings": findings, "raw_text": raw, }, ensure_ascii=False) + "\n") print(f"⚠️ experiment field(s) quarantined " f"({', '.join(f for f, _, _ in _q_hits)}); placeholder persisted, " f"raw text preserved in graph/quarantine.log for review.", file=sys.stderr) rendered = _render_experiment_page(slug, title, idea_id, verdict, confidence, date, hardware, duration, metrics, reasoning, provenance, tags) page_path.write_text(rendered) # Wire the idea --tested_by--> exp edge (idea side owns it, per the schema). if idea_id: if idea_id.startswith("idea:") and not (root / "ideas" / f"{idea_id.split(':', 1)[1]}.md").exists(): print(f"⚠️ add_experiment: idea {idea_id} not found in this wiki " f"(dangling edge recorded — create the node or fix the id).", file=sys.stderr) add_edge(str(root), idea_id, node_id, "tested_by", evidence=f"exp {slug} tests idea") rebuild_index(str(root)) rebuild_query_pack(str(root)) action = "updated" if was_update else "added" append_log(str(root), f"add_experiment: {action} {node_id} [verdict={verdict} confidence={confidence}]") print(f"Experiment {action}: {page_path} [verdict={verdict} confidence={confidence}]") return page_path def sync_papers(wiki_root: str, arxiv_ids: list[str], update_on_exist: bool = False) -> None: """Batch backfill: ingest many arxiv ids with a SINGLE metadata request. All ids are fetched in one ``id_list`` call (see fetch_arxiv_metadata_batch), then each page is written from the pre-fetched metadata — N papers cost 1 arXiv request instead of N, avoiding the burst-429 problem. Ids the batch did not return fall back to a per-id fetch (handles the occasional miss). Dedup is still handled per-id inside ingest_paper. """ ids = [a.strip() for a in arxiv_ids if a and a.strip()] if not ids: return try: batch = fetch_arxiv_metadata_batch(ids) except RuntimeError as e: print(f"Warning: batch fetch failed ({e}); falling back to per-id.", file=sys.stderr) batch = {} errors = [] for aid in ids: norm = _normalize_arxiv_id(aid) meta = batch.get(norm) try: ingest_paper(wiki_root, arxiv_id=aid, update_on_exist=update_on_exist, prefetched_meta=meta) except RuntimeError as e: print(f"ERROR: {aid}: {e}", file=sys.stderr) errors.append((aid, str(e))) if errors: print(f"\nsync: {len(errors)} error(s)", file=sys.stderr) sys.exit(1) def rebuild_index(wiki_root: str) -> None: """Regenerate index.md from wiki entity files.""" root = Path(wiki_root) lines = ["# Research Wiki Index", "", "_Auto-generated by `research_wiki.py rebuild_index`. Do not edit._", ""] for subdir, header in [("papers", "Papers"), ("ideas", "Ideas"), ("experiments", "Experiments"), ("claims", "Claims")]: d = root / subdir if not d.exists(): continue entries = [] for f in sorted(d.glob("*.md")): meta = _load_paper_frontmatter(f) node_id = meta.get("node_id", f.stem) # Claims use `name:` (papers use `title:`); fall back across both # so each node type renders its human label without special-casing. title = meta.get("title") or meta.get("name") or f.stem year = meta.get("year", "") # Surface the claim's honesty status inline (papers have no status). status = meta.get("status", "") suffix = f" ({year})" if year else (f" [{status}]" if status else "") entries.append(f"- `{node_id}` — {title}{suffix}") if entries: lines.append(f"## {header} ({len(entries)})") lines.extend(entries) lines.append("") (root / "index.md").write_text("\n".join(lines) + "\n") def append_log(wiki_root: str, message: str): """Append a timestamped entry to log.md.""" log_path = Path(wiki_root) / "log.md" ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") entry = f"- `{ts}` {message}\n" if log_path.exists(): with open(log_path, "a") as f: f.write(entry) else: log_path.write_text(f"# Research Wiki Log\n\n{entry}") def main(): parser = argparse.ArgumentParser(description="ARIS Research Wiki utilities") subparsers = parser.add_subparsers(dest="command") # init p_init = subparsers.add_parser("init") p_init.add_argument("wiki_root") # slug p_slug = subparsers.add_parser("slug") p_slug.add_argument("title") p_slug.add_argument("--author", default="") p_slug.add_argument("--year", type=int, default=0) # add_edge p_edge = subparsers.add_parser("add_edge") p_edge.add_argument("wiki_root") p_edge.add_argument("--from", dest="from_id", required=True) p_edge.add_argument("--to", dest="to_id", required=True) p_edge.add_argument("--type", dest="edge_type", required=True) p_edge.add_argument("--evidence", default="") # rebuild_query_pack p_qp = subparsers.add_parser("rebuild_query_pack") p_qp.add_argument("wiki_root") p_qp.add_argument("--max-chars", type=int, default=8000) # rebuild_index p_idx = subparsers.add_parser("rebuild_index") p_idx.add_argument("wiki_root") # stats p_stats = subparsers.add_parser("stats") p_stats.add_argument("wiki_root") # log p_log = subparsers.add_parser("log") p_log.add_argument("wiki_root") p_log.add_argument("message") # ingest_paper — the canonical ingest entrypoint called by integration hooks p_ing = subparsers.add_parser("ingest_paper", help="Create (or update) a papers/.md page") p_ing.add_argument("wiki_root") p_ing.add_argument("--arxiv-id", default="", help="arXiv identifier (2501.12345 or with v2); metadata auto-fetched") p_ing.add_argument("--title", default="", help="Paper title; required when --arxiv-id is absent") p_ing.add_argument("--authors", default="", help='Comma-separated author list, e.g. "Alice Smith, Bob Jones"') p_ing.add_argument("--year", type=int, default=0) p_ing.add_argument("--venue", default="") p_ing.add_argument("--external-id-doi", dest="doi", default="") p_ing.add_argument("--thesis", default="", help="One-line thesis; otherwise left as TODO for later enrichment") p_ing.add_argument("--tags", default="", help="Comma-separated tag list") p_ing.add_argument("--update-on-exist", action="store_true", help="Overwrite an existing page instead of skipping (default: skip)") # add_claim — create a claim node (PROVE/JUDGE output ledger) p_claim = subparsers.add_parser("add_claim", help="Create (or update) a claims/.md node") p_claim.add_argument("wiki_root") p_claim.add_argument("--slug", default="", help="Stable claim id, e.g. b1-main-ub (honored verbatim)") p_claim.add_argument("--name", required=True, help="Human-readable claim name/headline") p_claim.add_argument("--description", default="", help="One-line description for the index/frontmatter") p_claim.add_argument("--status", default="drafted", help="One of: " + ", ".join(sorted(_CLAIM_STATUSES))) p_claim.add_argument("--provenance", default="", help="Run directory that produced this claim (honesty receipt)") p_claim.add_argument("--statement", default="", help="Formal statement (body)") p_claim.add_argument("--scope", default="", help="Honest scope: what the claim does NOT say / banned wordings") p_claim.add_argument("--evidence", default="", help="Evidence chain: obligations, verdicts, provenance pointers") p_claim.add_argument("--tags", default="", help="Comma-separated tag list") p_claim.add_argument("--addresses", default="", help="Comma-separated gap ids this claim addresses, e.g. G2,G10") p_claim.add_argument("--extends", default="", help="Comma-separated paper node_ids/slugs this claim extends") p_claim.add_argument("--uses", default="", help="Comma-separated paper node_ids/slugs this claim uses") p_claim.add_argument("--depends-on", dest="depends_on", default="", help="Comma-separated claim node_ids/slugs this claim depends on") p_claim.add_argument("--refutes", default="", help="Comma-separated claim node_ids/slugs this claim refutes") p_claim.add_argument("--update-on-exist", action="store_true", help="Overwrite an existing claim instead of skipping (default: skip)") # upsert_idea — create/update an idea node (idea-creator Phase 7 write-back) p_idea = subparsers.add_parser("upsert_idea", help="Create (or update) an ideas/.md node") p_idea.add_argument("wiki_root") p_idea.add_argument("--slug", default="", help="Stable idea id (honored verbatim); else derived from --title") p_idea.add_argument("--title", required=True, help="Human-readable idea title") p_idea.add_argument("--description", default="", help="One-line description for the index/frontmatter") p_idea.add_argument("--stage", default="proposed", help="proposed | active | piloted | archived") p_idea.add_argument("--outcome", default="pending", help="One of: " + ", ".join(sorted(_IDEA_OUTCOMES))) p_idea.add_argument("--thesis", default="", help="Core hypothesis / direction (body)") p_idea.add_argument("--risks", default="", help="Novelty / feasibility risks (body)") p_idea.add_argument("--tags", default="", help="Comma-separated tag list") p_idea.add_argument("--based-on", dest="based_on", default="", help="Comma-separated paper node_ids/slugs that inspired this idea") p_idea.add_argument("--target-gaps", dest="target_gaps", default="", help="Comma-separated gap ids this idea addresses, e.g. G2,G10") p_idea.add_argument("--update-on-exist", action="store_true", help="Overwrite an existing idea instead of skipping (default: skip)") # add_experiment — create/update an experiment node (result-to-claim Step 5 #1) p_exp = subparsers.add_parser("add_experiment", help="Create (or update) an experiments/.md node") p_exp.add_argument("wiki_root") p_exp.add_argument("--slug", required=True, help="Stable experiment id, e.g. exp-001") p_exp.add_argument("--title", default="", help="Human-readable label (default: 'Experiment ')") p_exp.add_argument("--idea", default="", help="Idea node_id/slug this experiment tests (wires idea --tested_by--> exp)") p_exp.add_argument("--verdict", default="no", help="One of: " + ", ".join(sorted(_EXPERIMENT_VERDICTS))) p_exp.add_argument("--confidence", default="medium", help="One of: " + ", ".join(sorted(_EXPERIMENT_CONFIDENCE))) p_exp.add_argument("--date", default="", help="Run date") p_exp.add_argument("--hardware", default="", help="Hardware used") p_exp.add_argument("--duration", default="", help="Wall-clock / GPU-hours") p_exp.add_argument("--metrics", default="", help="Key metrics (body)") p_exp.add_argument("--reasoning", default="", help="Why this verdict (body)") p_exp.add_argument("--provenance", default="", help="Run dir / EXPERIMENT_AUDIT pointer (honesty receipt)") p_exp.add_argument("--tags", default="", help="Comma-separated tag list") p_exp.add_argument("--update-on-exist", action="store_true", help="Overwrite an existing experiment (the verdict owner /result-to-claim passes this on a re-judge)") # sync — batch backfill p_sync = subparsers.add_parser("sync", help="Batch ingest from a list of arXiv IDs") p_sync.add_argument("wiki_root") p_sync.add_argument("--arxiv-ids", default="", help="Comma-separated list of arXiv IDs") p_sync.add_argument("--from-file", default="", help="Path to a newline-delimited file of arXiv IDs (# comments ok)") p_sync.add_argument("--update-on-exist", action="store_true") args = parser.parse_args() if args.command == "init": init_wiki(args.wiki_root) elif args.command == "slug": print(slugify(args.title, args.author, args.year)) elif args.command == "add_edge": add_edge(args.wiki_root, args.from_id, args.to_id, args.edge_type, args.evidence) elif args.command == "rebuild_query_pack": rebuild_query_pack(args.wiki_root, args.max_chars) elif args.command == "rebuild_index": rebuild_index(args.wiki_root) elif args.command == "stats": get_stats(args.wiki_root) elif args.command == "log": append_log(args.wiki_root, args.message) elif args.command == "ingest_paper": authors = [a.strip() for a in args.authors.split(",") if a.strip()] tags = [t.strip() for t in args.tags.split(",") if t.strip()] ingest_paper(args.wiki_root, arxiv_id=args.arxiv_id, title=args.title, authors=authors, year=args.year, venue=args.venue, doi=args.doi, thesis=args.thesis, tags=tags, update_on_exist=args.update_on_exist) elif args.command == "add_claim": def _split(s: str) -> list[str]: return [x.strip() for x in s.split(",") if x.strip()] add_claim(args.wiki_root, args.slug, args.name, description=args.description, status=args.status, provenance=args.provenance, statement=args.statement, scope=args.scope, evidence=args.evidence, tags=_split(args.tags), addresses=_split(args.addresses), extends=_split(args.extends), uses=_split(args.uses), depends_on=_split(args.depends_on), refutes=_split(args.refutes), update_on_exist=args.update_on_exist) elif args.command == "upsert_idea": def _spliti(s: str) -> list[str]: return [x.strip() for x in s.split(",") if x.strip()] upsert_idea(args.wiki_root, args.slug, args.title, description=args.description, stage=args.stage, outcome=args.outcome, thesis=args.thesis, risks=args.risks, tags=_spliti(args.tags), based_on=_spliti(args.based_on), target_gaps=_spliti(args.target_gaps), update_on_exist=args.update_on_exist) elif args.command == "add_experiment": add_experiment(args.wiki_root, args.slug, title=args.title, idea=args.idea, verdict=args.verdict, confidence=args.confidence, date=args.date, hardware=args.hardware, duration=args.duration, metrics=args.metrics, reasoning=args.reasoning, provenance=args.provenance, tags=[x.strip() for x in args.tags.split(",") if x.strip()], update_on_exist=args.update_on_exist) elif args.command == "sync": ids: list[str] = [] if args.arxiv_ids: ids.extend([i.strip() for i in args.arxiv_ids.split(",") if i.strip()]) if args.from_file: fp = Path(args.from_file) if not fp.exists(): print(f"--from-file not found: {fp}", file=sys.stderr) sys.exit(2) for line in fp.read_text().splitlines(): line = line.strip() if line and not line.startswith("#"): ids.append(line) if not ids: print("sync: no arxiv ids supplied (use --arxiv-ids or --from-file)", file=sys.stderr) sys.exit(2) # Dedup the id list before we hit the network seen: set[str] = set() uniq_ids: list[str] = [] for i in ids: key = _normalize_arxiv_id(i) if key in seen: continue seen.add(key) uniq_ids.append(i) print(f"sync: {len(uniq_ids)} unique arxiv id(s)") sync_papers(args.wiki_root, uniq_ids, update_on_exist=args.update_on_exist) else: parser.print_help() if __name__ == "__main__": main()