# Week 1: Parallel Agent Execution Plan ## Overview Launch 10 independent agents to build the cognitive memory schema foundation. All agents work in the same worktree with file-level isolation. **Epic:** cognitive-memory-schema **Worktree:** `../epic-cognitive-memory` **Branch:** `epic/cognitive-memory` --- ## Infrastructure **Canonical Truth:** Supabase Storage (JSON files) **Graph Index:** Neo4j Aura Free **Authentication:** Supabase Auth **Session State:** Supabase Postgres (sessions, prompts, sync metadata only) --- ## Pre-Launch Checklist ```bash # 1. Ensure main is up to date git checkout main git pull origin main # 2. Create worktree with branch git worktree add ../epic-cognitive-memory -b epic/cognitive-memory # 3. Push branch to remote cd ../epic-cognitive-memory git push -u origin epic/cognitive-memory # 4. Verify worktree git worktree list ``` ### External Services Setup **Supabase:** 1. Create project at https://supabase.com 2. Create storage bucket named `canonical` (private) 3. Note: Project URL, Anon Key, Service Role Key **Neo4j Aura:** 1. Create free instance at https://neo4j.com/cloud/aura-free/ 2. Note: Connection URI, Username, Password --- ## Agent Assignments ### Agent 1: Preference Memory Schemas **Scope:** Create preference and core values schemas as Pydantic models **Files:** - `backend/memory/schemas/preference.py` (NEW) **Deliverables:** ```python from pydantic import BaseModel, Field from typing import Optional, List from datetime import datetime from backend.memory.schemas.base import CanonicalRecord, RefLink class PreferenceRecord(CanonicalRecord): """User preferences - likes, dislikes, habits. Serializes to JSON in Storage.""" type_: str = Field(alias='@type', default='Preference') preference_type: str # like, dislike, value, habit, aversion category: str # food, communication, work, leisure, relationships subject: str # what the preference is about description: str # detailed description strength: str # strong, moderate, mild confidence: float # 0.0-1.0 source_type: str # stated, inferred, observed is_verified: bool # user confirmed last_accessed_at: Optional[datetime] access_count: int = 0 claims: List[RefLink] = [] evidence: List[RefLink] = [] embedding: Optional[List[float]] = None class CoreValueRecord(CanonicalRecord): """User's core values and priorities. Serializes to JSON in Storage.""" type_: str = Field(alias='@type', default='CoreValue') value_name: str # honesty, family, achievement, creativity description: str priority_rank: int # 1 = highest priority confidence: float source_type: str is_verified: bool supporting_evidence: List[RefLink] = [] claims: List[RefLink] = [] embedding: Optional[List[float]] = None ``` **Task Prompt:** ``` Working in worktree: ../epic-cognitive-memory/ Branch: epic/cognitive-memory Create the Preference memory layer schemas following the Technical Spec. 1. Read the base schema for patterns: - backend/memory/schemas/base.py (CanonicalRecord base class) 2. Create backend/memory/schemas/preference.py with: - PreferenceRecord model (likes, dislikes, habits) - CoreValueRecord model (user's core values) - All fields from Technical Spec - Pydantic model that serializes to JSON-LD format - RefLink references for claims and evidence - Optional embedding field for vector indexing 3. Update backend/memory/schemas/__init__.py to export new schemas 4. Commit: "Add Preference memory layer schemas" ``` --- ### Agent 2: Decision Memory Schemas **Scope:** Create decision tracking schemas as Pydantic models **Files:** - `backend/memory/schemas/decision.py` (NEW) **Deliverables:** ```python from pydantic import BaseModel, Field from typing import Optional, List, Dict, Any from datetime import datetime from backend.memory.schemas.base import CanonicalRecord, RefLink class DecisionRecord(CanonicalRecord): """Records of user decisions with context and reasoning.""" type_: str = Field(alias='@type', default='Decision') decision_date: datetime domain: str # career, financial, personal, health, relationships summary: str # brief description context: str # situation that led to decision decision_made: str # what was decided alternatives_considered: Dict[str, Any] = {} reasoning: str # why this was chosen outcome: Optional[str] # what happened outcome_evaluation: Optional[str] # positive, negative, neutral, mixed lessons_learned: Optional[str] significance: str # major, moderate, minor confidence: float source_type: str is_verified: bool claims: List[RefLink] = [] evidence: List[RefLink] = [] embedding: Optional[List[float]] = None class DecisionPatternRecord(CanonicalRecord): """Patterns identified across multiple decisions.""" type_: str = Field(alias='@type', default='DecisionPattern') domain: str pattern_name: str pattern_description: str supporting_decisions: List[RefLink] = [] # Decision URN references confidence: float first_observed: datetime last_observed: datetime occurrence_count: int claims: List[RefLink] = [] evidence: List[RefLink] = [] ``` **Task Prompt:** ``` Working in worktree: ../epic-cognitive-memory/ Branch: epic/cognitive-memory Create the Decision memory layer schemas following the Technical Spec. 1. Read the base schema for patterns: - backend/memory/schemas/base.py (CanonicalRecord base class) 2. Create backend/memory/schemas/decision.py with: - DecisionRecord model (individual decisions) - DecisionPatternRecord model (patterns across decisions) - Dict field for alternatives_considered - RefLink references for related decisions - Pydantic model that serializes to JSON-LD format 3. Update backend/memory/schemas/__init__.py to export new schemas 4. Commit: "Add Decision memory layer schemas" ``` --- ### Agent 3: Semantic Knowledge Schemas **Scope:** Create semantic memory schemas for facts and knowledge domains **Files:** - `backend/memory/schemas/semantic.py` (NEW) **Deliverables:** ```python from pydantic import BaseModel, Field from typing import Optional, List from datetime import datetime from backend.memory.schemas.base import CanonicalRecord, RefLink class SemanticKnowledgeRecord(CanonicalRecord): """Facts and knowledge the user possesses.""" type_: str = Field(alias='@type', default='SemanticKnowledge') domain: str # technology, finance, cooking, music, etc. topic: str # specific topic within domain fact: str # the knowledge statement fact_type: str # definition, relationship, procedure, rule confidence: float source_type: str # learned, inferred, stated, extracted is_verified: bool related_knowledge: List[RefLink] = [] # URN references to related knowledge last_accessed_at: Optional[datetime] access_count: int = 0 claims: List[RefLink] = [] evidence: List[RefLink] = [] embedding: Optional[List[float]] = None class KnowledgeDomainRecord(CanonicalRecord): """Aggregated view of user expertise in a domain.""" type_: str = Field(alias='@type', default='KnowledgeDomain') domain_name: str description: str expertise_level: str # novice, familiar, proficient, expert knowledge_count: int first_learned: datetime last_updated: datetime confidence: float knowledge_items: List[RefLink] = [] # URN references to SemanticKnowledge ``` **Task Prompt:** ``` Working in worktree: ../epic-cognitive-memory/ Branch: epic/cognitive-memory Create the Semantic memory layer schemas following the Technical Spec. 1. Read the base schema for patterns: - backend/memory/schemas/base.py (CanonicalRecord base class) 2. Create backend/memory/schemas/semantic.py with: - SemanticKnowledgeRecord model (individual facts) - KnowledgeDomainRecord model (expertise areas) - RefLink references for related knowledge - Temporal dynamics fields (access tracking) - Pydantic model that serializes to JSON-LD format 3. Update backend/memory/schemas/__init__.py to export new schemas 4. Commit: "Add Semantic memory layer schemas" ``` --- ### Agent 4: Autobiographical Narrative Schemas **Scope:** Create narrative synthesis schemas **Files:** - `backend/memory/schemas/narrative.py` (NEW) **Deliverables:** ```python from pydantic import BaseModel, Field from typing import Optional, List, Dict, Any from datetime import datetime from backend.memory.schemas.base import CanonicalRecord, RefLink class AutobiographicalNarrativeRecord(CanonicalRecord): """Synthesized identity narrative from all memory layers.""" type_: str = Field(alias='@type', default='AutobiographicalNarrative') version: int # increments with each update identity_summary: str # "Who am I?" synthesis life_chapters: Dict[str, Any] = {} # major life periods key_relationships: Dict[str, Any] = {} # important people core_values: Dict[str, Any] = {} # derived from CoreValue defining_experiences: Dict[str, Any] = {} # formative events current_focus: str # what user is focused on now aspirations: str # future goals challenges: str # current struggles is_current: bool # most recent version generated_at: datetime source_data_cutoff: datetime # data up to this point confidence: float chapter_refs: List[RefLink] = [] # URN references to NarrativeChapter class NarrativeChapterRecord(CanonicalRecord): """Individual life chapter within narrative.""" type_: str = Field(alias='@type', default='NarrativeChapter') narrative_ref: RefLink # URN reference to parent narrative chapter_name: str time_period: str # e.g., "2020-2023" start_date: datetime end_date: Optional[datetime] # nullable for current summary: str key_events: List[RefLink] = [] # URN references to Episodes key_people: List[RefLink] = [] # URN references to Persons themes: List[str] = [] lessons: str order_index: int ``` **Task Prompt:** ``` Working in worktree: ../epic-cognitive-memory/ Branch: epic/cognitive-memory Create the Autobiographical Narrative schemas following the Technical Spec. 1. Read the base schema for patterns: - backend/memory/schemas/base.py (CanonicalRecord base class) 2. Create backend/memory/schemas/narrative.py with: - AutobiographicalNarrativeRecord model (identity synthesis) - NarrativeChapterRecord model (life periods) - Dict fields for complex nested data - Version tracking for narrative updates - RefLink references to Person, Episode records - Pydantic model that serializes to JSON-LD format 3. Update backend/memory/schemas/__init__.py to export new schemas 4. Commit: "Add Autobiographical Narrative schemas" ``` --- ### Agent 5: Reasoning Pattern Schemas **Scope:** Create reasoning and communication profile schemas **Files:** - `backend/memory/schemas/reasoning.py` (NEW) **Deliverables:** ```python from pydantic import BaseModel, Field from typing import Optional, List from datetime import datetime from backend.memory.schemas.base import CanonicalRecord, RefLink class ReasoningPatternRecord(CanonicalRecord): """Patterns in how user thinks and solves problems.""" type_: str = Field(alias='@type', default='ReasoningPattern') pattern_type: str # argumentation, problem_solving, risk_assessment pattern_name: str description: str examples: List[str] = [] # example instances counter_examples: List[str] = [] # exceptions confidence: float source_type: str is_verified: bool first_observed: datetime last_observed: datetime occurrence_count: int claims: List[RefLink] = [] evidence: List[RefLink] = [] class CommunicationProfileRecord(CanonicalRecord): """User's communication style and preferences.""" type_: str = Field(alias='@type', default='CommunicationProfile') formality_level: str # formal, casual, adaptive directness: str # direct, diplomatic, balanced detail_preference: str # high_detail, summary, context_dependent tone: List[str] = [] # warm, professional, analytical vocabulary_level: str # simple, moderate, advanced, technical preferred_channels: List[str] = [] # email, chat, call response_speed: str # quick, thoughtful, varies confidence: float is_verified: bool claims: List[RefLink] = [] evidence: List[RefLink] = [] ``` **Task Prompt:** ``` Working in worktree: ../epic-cognitive-memory/ Branch: epic/cognitive-memory Create the Reasoning Pattern schemas following the Technical Spec. 1. Read the base schema for patterns: - backend/memory/schemas/base.py (CanonicalRecord base class) 2. Create backend/memory/schemas/reasoning.py with: - ReasoningPatternRecord model (thinking patterns) - CommunicationProfileRecord model (communication style) - List fields for examples and preferences - Temporal tracking for pattern observation - Pydantic model that serializes to JSON-LD format 3. Update backend/memory/schemas/__init__.py to export new schemas 4. Commit: "Add Reasoning Pattern schemas" ``` --- ### Agent 6: LLM Abstraction Layer **Scope:** Create LLM provider abstraction for cloud + local support **Files:** - `backend/llm/__init__.py` (NEW directory init) - `backend/llm/provider.py` (NEW) - `backend/llm/providers/__init__.py` (NEW) - `backend/llm/providers/openai.py` (NEW) - `backend/llm/providers/ollama.py` (NEW) **Deliverables:** ```python # provider.py from abc import ABC, abstractmethod from typing import List, Dict, Any, AsyncIterator class LLMProvider(ABC): """Abstract base class for LLM providers.""" @abstractmethod async def generate( self, prompt: str, system_prompt: str = None, temperature: float = 0.7, max_tokens: int = 1000, ) -> str: """Generate a completion.""" pass @abstractmethod async def generate_stream( self, prompt: str, system_prompt: str = None, temperature: float = 0.7, max_tokens: int = 1000, ) -> AsyncIterator[str]: """Stream a completion.""" pass @abstractmethod async def embed(self, text: str) -> List[float]: """Generate embeddings.""" pass def get_llm_provider(provider: str = None) -> LLMProvider: """Factory to get configured LLM provider.""" pass # providers/openai.py class OpenAIProvider(LLMProvider): """OpenAI API implementation.""" pass # providers/ollama.py class OllamaProvider(LLMProvider): """Ollama local model implementation.""" pass ``` **Task Prompt:** ``` Working in worktree: ../epic-cognitive-memory/ Branch: epic/cognitive-memory Create the LLM abstraction layer for hybrid cloud/local support. 1. Read existing config for patterns: - backend/config.py (settings patterns) 2. Create directory structure: - backend/llm/__init__.py - backend/llm/providers/__init__.py 3. Create backend/llm/provider.py with: - Abstract LLMProvider base class - generate() - single completion - generate_stream() - streaming completion - embed() - embeddings (for vector indexing) - Factory function get_llm_provider() 4. Create backend/llm/providers/openai.py with: - OpenAIProvider class - Uses openai async client - Reads API key from config 5. Create backend/llm/providers/ollama.py with: - OllamaProvider class - HTTP calls to local Ollama server - Configurable model and host 6. Update backend/config.py to add: - llm_provider: str (openai, ollama) - ollama_host: str - ollama_model: str 7. Commit: "Add LLM abstraction layer with OpenAI and Ollama support" ``` --- ### Agent 7: Frontend Shell **Scope:** Create chat and dashboard page layouts with mock data **Files:** - `frontend/src/pages/chat.tsx` (NEW) - `frontend/src/pages/dashboard.tsx` (MODIFY - add memory stats) - `frontend/src/components/chat/ChatMessage.tsx` (NEW) - `frontend/src/components/chat/EvidenceCard.tsx` (NEW) **Deliverables:** ```tsx // chat.tsx - Main chat interface export default function ChatPage() { // Message list with evidence citations // Input field with send button // Streaming response display // Evidence cards (expandable sources) } // ChatMessage.tsx - Individual message component export function ChatMessage({ role, content, sources }) { // User/Assistant message styling // Source citations inline // Expandable evidence cards } // EvidenceCard.tsx - Source evidence display export function EvidenceCard({ source }) { // Source type icon // Title and snippet // Confidence indicator // Link to original } ``` **Task Prompt:** ``` Working in worktree: ../epic-cognitive-memory/ Branch: epic/cognitive-memory Create the frontend chat and dashboard shell with mock data. 1. Read existing frontend patterns: - frontend/src/pages/onboarding.tsx (page patterns) - frontend/src/components/ (component patterns) 2. Create frontend/src/pages/chat.tsx with: - Full-height chat layout - Message list component - Input field with send button - Mock conversation data - Evidence cards for sources - Streaming response placeholder 3. Create frontend/src/components/chat/: - ChatMessage.tsx - message display - EvidenceCard.tsx - source citation card - ChatInput.tsx - input with send 4. Update frontend/src/pages/dashboard.tsx to add: - Memory stats cards (People, Events, Knowledge) - Quick query input - Recent prompts section (mock) 5. Add routes in App.tsx for /chat 6. Commit: "Add chat and dashboard UI shell" ``` --- ### Agent 8: Canonical Base Types **Scope:** Create base Pydantic models and utility functions for canonical JSON-LD format **Files:** - `backend/memory/__init__.py` (NEW directory init) - `backend/memory/schemas/__init__.py` (NEW directory init) - `backend/memory/schemas/base.py` (NEW) - `backend/memory/schemas/evidence.py` (NEW) - `backend/memory/schemas/claim.py` (NEW) **Deliverables:** ```python # base.py from pydantic import BaseModel, Field from typing import Optional, List from datetime import datetime from uuid import uuid4 SCHEMA_CONTEXT = "https://continuum.dev/schema/v1" def create_urn(record_type: str, record_id: str = None) -> str: """Create a URN identifier for a record.""" if record_id is None: record_id = str(uuid4()) return f"urn:continuum:{record_type.lower()}:{record_id}" def extract_id_from_urn(urn: str) -> str: """Extract the ID portion from a URN.""" return urn.split(':')[-1] class RefLink(BaseModel): """Reference link to another record.""" ref: str = Field(alias='@ref') class Config: populate_by_name = True class EvidenceRefLink(RefLink): """Reference link to evidence with type.""" type: str class CanonicalRecord(BaseModel): """Base class for all canonical memory records.""" context: str = Field(alias='@context', default=SCHEMA_CONTEXT) type_: str = Field(alias='@type') id_: str = Field(alias='@id') user_id: str created_at: datetime = Field(default_factory=datetime.utcnow) updated_at: datetime = Field(default_factory=datetime.utcnow) version: int = 1 class Config: populate_by_name = True def to_storage_path(self) -> str: """Generate the storage path for this record.""" record_type = self.type_.lower() record_id = extract_id_from_urn(self.id_) return f"users/{self.user_id}/memories/{record_type}/{record_id}.json" # evidence.py class EvidenceRecord(CanonicalRecord): """Link to source data that supports a claim.""" type_: str = Field(alias='@type', default='Evidence') source_type: str # email, calendar, contact, document, user_input source_integration: str # google, microsoft, manual source_id: str # ID from the source system raw_data_ref: str # Path to raw file in storage extracted_text: Optional[str] occurred_at: datetime metadata: dict = {} # claim.py class ClaimRecord(CanonicalRecord): """An atomic assertion about a memory with evidence.""" type_: str = Field(alias='@type', default='Claim') claim_type: str # fact, relationship, preference, event, decision, pattern subject_type: str # Person, Episode, etc. subject_id: str # URN of the subject claim_text: str # The assertion confidence: float # 0.0-1.0 verification_status: str # inferred, suggested, verified, rejected, corrected evidence: List[EvidenceRefLink] = [] inference_chain: Optional[str] # How this was derived verified_at: Optional[datetime] verified_by: Optional[str] # user, system ``` **Task Prompt:** ``` Working in worktree: ../epic-cognitive-memory/ Branch: epic/cognitive-memory Create the canonical base types that all memory schemas inherit from. 1. Create directory structure: - backend/memory/__init__.py - backend/memory/schemas/__init__.py 2. Create backend/memory/schemas/base.py with: - SCHEMA_CONTEXT constant - create_urn() function to generate URN identifiers - extract_id_from_urn() function - RefLink model for references between records - EvidenceRefLink model with type field - CanonicalRecord base class with: - @context, @type, @id fields (JSON-LD) - user_id, created_at, updated_at, version - to_storage_path() method - Pydantic config for JSON-LD serialization 3. Create backend/memory/schemas/evidence.py with: - EvidenceRecord model for source data links 4. Create backend/memory/schemas/claim.py with: - ClaimRecord model for provenance assertions 5. Commit: "Add canonical base types for JSON-LD serialization" ``` --- ### Agent 9: Storage Abstraction Service **Scope:** Create storage abstraction layer for Supabase Storage with future S3 migration support **Files:** - `backend/storage/__init__.py` (NEW directory init) - `backend/storage/interface.py` (NEW) - `backend/storage/supabase.py` (NEW) - `backend/storage/s3.py` (NEW - stub for future) **Deliverables:** ```python # interface.py from abc import ABC, abstractmethod from typing import List, AsyncGenerator, Optional, TypeVar, Type from backend.memory.schemas.base import CanonicalRecord T = TypeVar('T', bound=CanonicalRecord) class StorageService(ABC): """Abstract interface for canonical storage. Swappable between Supabase and S3.""" @abstractmethod async def write_memory(self, record: CanonicalRecord) -> str: """Write a memory record to storage. Returns storage path.""" pass @abstractmethod async def read_memory( self, user_id: str, memory_type: str, memory_id: str, schema_class: Type[T] ) -> Optional[T]: """Read a memory record from storage.""" pass @abstractmethod async def list_memories( self, user_id: str, memory_type: str ) -> List[str]: """List all memory IDs of a type for a user.""" pass @abstractmethod async def delete_memory( self, user_id: str, memory_type: str, memory_id: str ) -> bool: """Delete a memory record.""" pass @abstractmethod async def write_event_log( self, user_id: str, event: dict ) -> str: """Write to the immutable event log.""" pass @abstractmethod async def export_user_data( self, user_id: str ) -> AsyncGenerator[dict, None]: """Export all user data as JSON objects.""" pass def get_storage_service() -> StorageService: """Factory to get configured storage service.""" pass # supabase.py from supabase import create_client, Client from backend.storage.interface import StorageService class SupabaseStorageService(StorageService): """Supabase Storage implementation.""" def __init__(self, supabase_url: str, supabase_key: str, bucket: str = "canonical"): self.client: Client = create_client(supabase_url, supabase_key) self.bucket = bucket async def write_memory(self, record: CanonicalRecord) -> str: path = record.to_storage_path() data = record.model_dump_json(by_alias=True, indent=2) self.client.storage.from_(self.bucket).upload( path, data.encode(), {"content-type": "application/json", "upsert": "true"} ) # Write event log entry await self._write_event(record.user_id, "MemoryCreated", record) return path # ... implement all abstract methods # s3.py (stub for future migration) class S3StorageService(StorageService): """AWS S3 implementation - for future migration.""" def __init__(self, bucket: str, region: str = "us-east-1"): # To be implemented when migrating to S3 raise NotImplementedError("S3 storage not yet implemented") ``` **Task Prompt:** ``` Working in worktree: ../epic-cognitive-memory/ Branch: epic/cognitive-memory Create the storage abstraction layer for canonical JSON files. 1. Create directory structure: - backend/storage/__init__.py 2. Create backend/storage/interface.py with: - StorageService abstract base class - write_memory() - write record to storage - read_memory() - read record from storage - list_memories() - list all records of a type - delete_memory() - delete a record - write_event_log() - append to event log - export_user_data() - export all user data - get_storage_service() factory function 3. Create backend/storage/supabase.py with: - SupabaseStorageService implementation - Uses Supabase Storage client - Writes JSON with proper content-type - Implements event logging - Handles paths: users/{user_id}/memories/{type}/{id}.json 4. Create backend/storage/s3.py with: - S3StorageService stub (raises NotImplementedError) - Document the migration path to AWS S3 5. Update backend/config.py to add: - storage_provider: str (supabase, s3) - supabase_url: str - supabase_key: str - storage_bucket: str 6. Commit: "Add storage abstraction layer with Supabase implementation" ``` --- ### Agent 10: Neo4j Graph Service **Scope:** Create Neo4j service for graph indexing and Cypher queries **Files:** - `backend/graph/__init__.py` (NEW directory init) - `backend/graph/service.py` (NEW) - `backend/graph/queries.py` (NEW) **Deliverables:** ```python # service.py from neo4j import AsyncGraphDatabase, AsyncDriver from typing import List, Dict, Any, Optional from backend.memory.schemas.base import CanonicalRecord class Neo4jService: """Neo4j graph database service for indexing and queries.""" def __init__(self, uri: str, username: str, password: str): self.driver: AsyncDriver = AsyncGraphDatabase.driver( uri, auth=(username, password) ) async def close(self): await self.driver.close() async def setup_indexes(self): """Create indexes and constraints.""" async with self.driver.session() as session: # Node indexes await session.run("CREATE INDEX user_id IF NOT EXISTS FOR (u:User) ON (u.id)") await session.run("CREATE INDEX person_user IF NOT EXISTS FOR (p:Person) ON (p.user_id)") await session.run("CREATE INDEX person_name IF NOT EXISTS FOR (p:Person) ON (p.canonical_name)") # Vector indexes (Neo4j 5.11+) await session.run(""" CREATE VECTOR INDEX person_embeddings IF NOT EXISTS FOR (p:Person) ON p.embedding OPTIONS {indexConfig: { `vector.dimensions`: 1536, `vector.similarity_function`: 'cosine' }} """) async def index_record(self, record: CanonicalRecord) -> None: """Index a canonical record as a graph node.""" record_type = record.type_ properties = record.model_dump(by_alias=True, exclude={'@context'}) async with self.driver.session() as session: await session.run( f""" MERGE (n:{record_type} {{id: $id}}) SET n += $properties WITH n MATCH (u:User {{id: $user_id}}) MERGE (u)-[:OWNS]->(n) """, id=record.id_, user_id=record.user_id, properties=properties ) async def create_relationship( self, from_id: str, to_id: str, relationship_type: str, properties: Dict[str, Any] = None ) -> None: """Create a relationship between two nodes.""" async with self.driver.session() as session: await session.run( f""" MATCH (a {{id: $from_id}}) MATCH (b {{id: $to_id}}) MERGE (a)-[r:{relationship_type}]->(b) SET r += $properties """, from_id=from_id, to_id=to_id, properties=properties or {} ) async def query(self, cypher: str, params: Dict[str, Any] = None) -> List[Dict]: """Execute a Cypher query and return results.""" async with self.driver.session() as session: result = await session.run(cypher, params or {}) return [record.data() async for record in result] async def semantic_search( self, user_id: str, embedding: List[float], node_type: str, limit: int = 10 ) -> List[Dict]: """Search for similar nodes by embedding.""" async with self.driver.session() as session: result = await session.run( f""" MATCH (u:User {{id: $user_id}})-[:OWNS]->(n:{node_type}) WHERE n.embedding IS NOT NULL WITH n, vector.similarity.cosine(n.embedding, $embedding) AS score WHERE score > 0.7 RETURN n, score ORDER BY score DESC LIMIT $limit """, user_id=user_id, embedding=embedding, limit=limit ) return [record.data() async for record in result] async def rebuild_from_storage( self, user_id: str, records: List[CanonicalRecord] ) -> Dict[str, int]: """Rebuild graph index from canonical storage records.""" node_count = 0 rel_count = 0 # Clear existing user data async with self.driver.session() as session: await session.run( "MATCH (u:User {id: $user_id})-[:OWNS]->(n) DETACH DELETE n", user_id=user_id ) # Reindex all records for record in records: await self.index_record(record) node_count += 1 return {"nodes": node_count, "relationships": rel_count} # queries.py - Common query templates WHO_IS_QUERY = """ MATCH (u:User {id: $user_id})-[:OWNS]->(p:Person) WHERE toLower(p.canonical_name) CONTAINS toLower($name) OPTIONAL MATCH (u)-[:OWNS]->(e:Episode)<-[:PARTICIPATED_IN]-(p) WITH p, e ORDER BY e.occurred_at DESC RETURN p, collect(e)[0..5] as recent_episodes LIMIT 1 """ HOW_DO_I_KNOW_QUERY = """ MATCH (u:User {id: $user_id})-[:OWNS]->(p:Person {id: $person_id}) MATCH (u)-[r:KNOWS]->(p) OPTIONAL MATCH (u)-[:OWNS]->(first:Episode)<-[:HOW_MET]-(r) OPTIONAL MATCH (u)-[:OWNS]->(e:Episode)<-[:PARTICIPATED_IN]-(p) RETURN p, r, first, collect(e) as all_episodes """ ``` **Task Prompt:** ``` Working in worktree: ../epic-cognitive-memory/ Branch: epic/cognitive-memory Create the Neo4j graph service for indexing and Cypher queries. 1. Create directory structure: - backend/graph/__init__.py 2. Create backend/graph/service.py with: - Neo4jService class - Async Neo4j driver connection - setup_indexes() - create node and vector indexes - index_record() - index a canonical record as node - create_relationship() - create edges between nodes - query() - execute arbitrary Cypher - semantic_search() - vector similarity search - rebuild_from_storage() - rebuild index from canonical files 3. Create backend/graph/queries.py with: - WHO_IS_QUERY - find person by name with episodes - HOW_DO_I_KNOW_QUERY - relationship with history - Common query templates as constants 4. Update backend/config.py to add: - neo4j_uri: str - neo4j_username: str - neo4j_password: str 5. Commit: "Add Neo4j graph service with indexing and queries" ``` --- ## Execution Status Template Create `.claude/epics/cognitive-memory/execution-status.md`: ```markdown --- started: {datetime} worktree: ../epic-cognitive-memory branch: epic/cognitive-memory --- # Execution Status: Cognitive Memory Schema ## Active Agents | Agent | Task | Files | Status | |-------|------|-------|--------| | Agent-1 | Preference Schemas | memory/schemas/preference.py | 🔄 In Progress | | Agent-2 | Decision Schemas | memory/schemas/decision.py | 🔄 In Progress | | Agent-3 | Semantic Schemas | memory/schemas/semantic.py | 🔄 In Progress | | Agent-4 | Narrative Schemas | memory/schemas/narrative.py | 🔄 In Progress | | Agent-5 | Reasoning Schemas | memory/schemas/reasoning.py | 🔄 In Progress | | Agent-6 | LLM Abstraction | llm/*.py | 🔄 In Progress | | Agent-7 | Frontend Shell | frontend/src/pages/*.tsx | 🔄 In Progress | | Agent-8 | Base Types | memory/schemas/base.py | 🔄 In Progress | | Agent-9 | Storage Service | storage/*.py | 🔄 In Progress | | Agent-10 | Neo4j Service | graph/*.py | 🔄 In Progress | ## Completed - None yet ## Dependencies - Agent 8 (Base Types) should complete before Agents 1-5 - Agent 9 (Storage) and Agent 10 (Neo4j) are independent ## Next Phase (After Schema) - Core memory schemas (Person, Episode, Relationship) - Memory Coordinator service - Google sync services ``` --- ## Launch Commands ```bash # Launch all 10 agents in parallel using Task tool # Each agent works on independent files in the same worktree ``` **Single message with 10 Task tool calls:** ```yaml # Agent 8 first (base dependency) Task: description: "Canonical base types" subagent_type: "general-purpose" prompt: | {Agent 8 prompt from above} # Agents 1-5 in parallel (depend on Agent 8) # Agents 6-7 in parallel (no dependencies) # Agents 9-10 in parallel (no dependencies) ``` --- ## Coordination Rules 1. **File Isolation**: Each agent only modifies their assigned files 2. **Dependency**: Agent 8 (Base Types) should complete before Agents 1-5 start 3. **Shared File**: `backend/memory/schemas/__init__.py` - coordinate via sequential commits 4. **Commit Often**: Each agent commits after completing their work 5. **Push Regularly**: Push to remote for visibility 6. **No Conflicts**: File-level isolation means no merge conflicts --- ## Post-Completion After all 10 agents complete: 1. **Verify all schemas created**: ```bash cd ../epic-cognitive-memory ls backend/memory/schemas/ ls backend/storage/ ls backend/graph/ ``` 2. **Run schema validation**: ```bash cd ../epic-cognitive-memory uv run python -c "from backend.memory.schemas import *; print('All schemas imported successfully')" ``` 3. **Test storage service**: ```bash cd ../epic-cognitive-memory uv run python -c "from backend.storage import get_storage_service; print('Storage service ready')" ``` 4. **Test Neo4j connection**: ```bash cd ../epic-cognitive-memory uv run python -c "from backend.graph import Neo4jService; print('Neo4j service ready')" ``` 5. **No database migration needed** - canonical storage is file-based 6. **Merge to main or continue to Phase 2**