--- name: building-rag-systems description: | Build production RAG systems with semantic chunking, incremental indexing, and filtered retrieval. Use when implementing document ingestion pipelines, vector search with Qdrant, or context-aware retrieval. Covers chunking strategies, change detection, payload indexing, and context expansion. NOT when doing simple similarity search without production requirements. --- # Building RAG Systems Production-grade RAG with semantic chunking, incremental updates, and filtered retrieval. ## Quick Start ```bash # Dependencies pip install qdrant-client openai pydantic python-frontmatter # Core components # 1. Crawler → discovers files, extracts path metadata # 2. Parser → extracts frontmatter, computes file hash # 3. Chunker → semantic split on ## headers, 400 tokens, 15% overlap # 4. Embedder → batched OpenAI embeddings # 5. Uploader → Qdrant upsert with indexed payloads ``` --- ## Ingestion Pipeline ### Architecture ``` ┌──────────┐ ┌────────┐ ┌─────────┐ ┌──────────┐ ┌──────────┐ │ Crawler │ -> │ Parser │ -> │ Chunker │ -> │ Embedder │ -> │ Uploader │ └──────────┘ └────────┘ └─────────┘ └──────────┘ └──────────┘ │ │ │ │ │ Discovers Extracts Splits by Generates Upserts to files frontmatter semantic vectors Qdrant + file hash boundaries (batched) (batched) ``` ### Semantic Chunking (NOT Fixed-Size) ```python class SemanticChunker: """ Production chunking: - Split on ## headers (semantic boundaries) - Target 400 tokens (NVIDIA benchmark optimal) - 15% overlap for context continuity - Track prev/next for context expansion """ SECTION_PATTERN = re.compile(r"(?=^## )", re.MULTILINE) TOKENS_PER_WORD = 1.3 def __init__( self, target_tokens: int = 400, max_tokens: int = 512, overlap_percent: float = 0.15, ): self.target_words = int(target_tokens / self.TOKENS_PER_WORD) self.overlap_words = int(self.target_words * overlap_percent) def chunk(self, content: str, file_hash: str) -> list[Chunk]: sections = self.SECTION_PATTERN.split(content) chunks = [] for idx, section in enumerate(sections): content_hash = hashlib.sha256(section.encode()).hexdigest()[:16] chunk_id = f"{file_hash[:8]}_{content_hash}_{idx}" chunks.append(Chunk( id=chunk_id, text=section, chunk_index=idx, total_chunks=len(sections), prev_chunk_id=chunks[-1].id if chunks else None, content_hash=content_hash, source_file_hash=file_hash, )) # Set next_chunk_id on previous if len(chunks) > 1: chunks[-2].next_chunk_id = chunk_id return chunks ``` ### Change Detection (Incremental Updates) ```python def compute_file_hash(file_path: str) -> str: """SHA-256 for change detection.""" with open(file_path, 'rb') as f: return hashlib.sha256(f.read()).hexdigest() class QdrantStateTracker: """Query Qdrant payloads directly - no external state DB needed.""" def get_indexed_files(self, book_id: str) -> dict[str, str]: """Returns {file_path: file_hash} from Qdrant.""" indexed = {} offset = None while True: points, next_offset = self.client.scroll( collection_name=self.collection, scroll_filter=Filter(must=[ FieldCondition(key="book_id", match=MatchValue(value=book_id)) ]), limit=100, offset=offset, with_payload=["source_file", "source_file_hash"], with_vectors=False, ) for point in points: indexed[point.payload["source_file"]] = point.payload["source_file_hash"] if next_offset is None: break offset = next_offset return indexed def detect_changes(self, current: dict[str, str], indexed: dict[str, str]): """Compare filesystem vs index.""" new = [p for p in current if p not in indexed] deleted = [p for p in indexed if p not in current] modified = [p for p in current if p in indexed and current[p] != indexed[p]] return new, modified, deleted ``` ### Batched Embeddings ```python class OpenAIEmbedder: def __init__(self, model: str = "text-embedding-3-small", batch_size: int = 20): self.client = OpenAI() self.model = model self.batch_size = batch_size # OpenAI recommendation def embed_chunks(self, chunks: list[Chunk]) -> list[EmbeddedChunk]: embedded = [] for i in range(0, len(chunks), self.batch_size): batch = chunks[i:i + self.batch_size] response = self.client.embeddings.create( input=[c.text for c in batch], model=self.model, ) for chunk, data in zip(batch, response.data): embedded.append(EmbeddedChunk(**chunk.dict(), embedding=data.embedding)) return embedded ``` ### Qdrant Collection with Payload Indexes ```python def create_collection(self, recreate: bool = False): """Create collection with proper indexes for filtered retrieval.""" self.client.create_collection( collection_name=self.collection, vectors_config=VectorParams(size=1536, distance=Distance.COSINE), ) # Index ALL fields you filter by indexes = [ ("book_id", PayloadSchemaType.KEYWORD), # Tenant isolation ("module", PayloadSchemaType.KEYWORD), # Content filter ("chapter", PayloadSchemaType.INTEGER), # Range filter ("hardware_tier", PayloadSchemaType.INTEGER),# Personalization ("proficiency_level", PayloadSchemaType.KEYWORD), ("parent_doc_id", PayloadSchemaType.KEYWORD),# Context expansion ("source_file_hash", PayloadSchemaType.KEYWORD), # Change detection ] for field, schema in indexes: self.client.create_payload_index( collection_name=self.collection, field_name=field, field_schema=schema, ) ``` --- ## Retrieval Patterns ### Comprehensive Filter Builder ```python def build_filter(self, query: SearchQuery) -> Filter: """Build Qdrant filter with all conditions (AND logic).""" conditions = [] # Required: Tenant isolation conditions.append(FieldCondition( key="book_id", match=MatchValue(value=query.book_id) )) # Required: Hardware tier (lte = "tier X or lower") conditions.append(FieldCondition( key="hardware_tier", range=Range(lte=query.hardware_tier) )) # Optional: Module exact match if query.module: conditions.append(FieldCondition( key="module", match=MatchValue(value=query.module) )) # Optional: Chapter range if query.chapter_min or query.chapter_max: chapter_range = Range() if query.chapter_min: chapter_range.gte = query.chapter_min if query.chapter_max: chapter_range.lte = query.chapter_max conditions.append(FieldCondition(key="chapter", range=chapter_range)) # Optional: Proficiency OR logic if query.proficiency_levels: conditions.append(FieldCondition( key="proficiency_level", match=MatchAny(any=query.proficiency_levels), )) return Filter(must=conditions) ``` ### Context Expansion (Walk Chunk Chain) ```python def expand_context(self, chunk_id: str, prev: int = 1, next: int = 1) -> list[Chunk]: """Walk prev_chunk_id/next_chunk_id chain for surrounding context.""" current = self.get_chunk_by_id(chunk_id) if not current: return [] # Walk backwards prev_chunks = [] prev_id = current.prev_chunk_id for _ in range(prev): if not prev_id: break chunk = self.get_chunk_by_id(prev_id) if not chunk: break prev_chunks.insert(0, chunk) prev_id = chunk.prev_chunk_id # Walk forwards next_chunks = [] next_id = current.next_chunk_id for _ in range(next): if not next_id: break chunk = self.get_chunk_by_id(next_id) if not chunk: break next_chunks.append(chunk) next_id = chunk.next_chunk_id return prev_chunks + [current] + next_chunks ``` ### Full Document Retrieval ```python def get_document_chunks(self, parent_doc_id: str) -> list[Chunk]: """Get all chunks for a document, ordered by chunk_index.""" points, _ = self.client.scroll( collection_name=self.collection, scroll_filter=Filter(must=[ FieldCondition(key="parent_doc_id", match=MatchValue(value=parent_doc_id)) ]), limit=100, with_payload=True, with_vectors=False, ) chunks = [self._to_chunk(p) for p in points] chunks.sort(key=lambda c: c.chunk_index) return chunks ``` --- ## Payload Schema ```python class ChunkPayload(BaseModel): """Complete payload for filtered retrieval and context expansion.""" # Tenant isolation book_id: str # Content filters (all indexed) module: str chapter: int lesson: int hardware_tier: int proficiency_level: str # Display content text: str section_title: Optional[str] source_file: str # Context expansion parent_doc_id: str chunk_index: int total_chunks: int prev_chunk_id: Optional[str] next_chunk_id: Optional[str] # Change detection content_hash: str source_file_hash: str ``` --- ## Anti-Patterns | Don't | Do Instead | |-------|------------| | Fixed character chunking | Semantic boundaries (## headers) | | Position-based chunk IDs | Content hash for stable IDs | | No overlap between chunks | 10-20% overlap for continuity | | Full re-index on every change | Incremental with file hash detection | | Missing payload indexes | Index every field you filter by | | Synchronous embedding | Batch with background jobs | | External state database | Qdrant-native state tracking | --- ## Verification Run: `python scripts/verify.py` ## Related Skills - `scaffolding-fastapi-dapr` - API patterns for search endpoints - `streaming-llm-responses` - Streaming RAG responses ## References - [references/ingestion-patterns.md](references/ingestion-patterns.md) - Full ingestion pipeline - [references/retrieval-patterns.md](references/retrieval-patterns.md) - Filter strategies, context expansion