--- name: llm-pipeline description: Pydantic-AI agents, RAG, embeddings for Pulse Radar knowledge extraction. --- # LLM Pipeline Skill Pulse Radar uses LLM pipeline to transform raw Telegram messages into structured knowledge. Core philosophy: Messages individually are noise; batched extraction reveals patterns. ``` TOPICS (categories) └─ ATOMS (knowledge units: problem/solution/decision/insight...) └─ MESSAGES (raw data, hidden layer) ``` ```python # 1. Message arrives via Telegram webhook await save_telegram_message(message) # triggers TaskIQ # 2. Scoring (AI Judge, not heuristics - ADR-003) score = await importance_scorer.score(message) # classification: SIGNAL (>0.6) / NOISE (<0.3) # 3. Auto-trigger extraction when threshold met if unprocessed_count >= 10: # ai_config.message_threshold await extract_knowledge_from_messages_task.kiq() # 4. KnowledgeOrchestrator runs Pydantic AI agent agent = Agent( model=model, system_prompt=get_extraction_prompt("uk"), output_type=KnowledgeExtractionOutput, # CRITICAL: structured output output_retries=5, ) result = await agent.run(messages_content) # 5. Save to DB + embed await save_topics_and_atoms(result.output) await embed_atoms_batch_task.kiq(atom_ids) ``` ```python from pydantic_ai import Agent from pydantic_ai.models.openai import OpenAIChatModel # Provider-specific model creation if provider.type == "ollama": model = OpenAIChatModel( model_name=agent_config.model_name, provider=OllamaProvider(base_url=provider.base_url), ) elif provider.type == "openai": model = OpenAIChatModel( model_name=agent_config.model_name, provider=OpenAIProvider(api_key=api_key), ) # Agent with structured output agent = Agent( model=model, output_type=MyPydanticModel, # Forces JSON schema system_prompt="...", output_retries=5, ) ``` 1. **JSON-only output** — explicitly state "respond with ONLY JSON" 2. **Schema in prompt** — include exact JSON structure expected 3. **Language enforcement** — "ALL fields MUST be in Ukrainian" 4. **Retry on language mismatch** — use `get_strengthened_prompt()` 5. **No markdown** — models often wrap JSON in ```json blocks ```python # OpenAI: 1536 dimensions (text-embedding-3-small) # Ollama: 1024 dimensions (mxbai-embed-large) → padded to 1536 await embedding_service.generate_embedding(text) await embedding_service.embed_messages_batch(session, ids, batch_size=10) ``` ```python # SemanticSearchService uses pgvector cosine similarity similar_atoms = await search_service.search_atoms( query_embedding=embedding, limit=5, threshold=0.65, # ai_config.semantic_search ) # RAGContextBuilder assembles context for LLM context = await rag_builder.build_context( query=user_query, similar_atoms=similar_atoms, related_messages=messages, ) ``` ## RAG vs CAG | Strategy | Data Type | Pulse Radar Use | |----------|-----------|-----------------| | **RAG** | Dynamic (messages, atoms) | Semantic search, history retrieval | | **CAG** | Static (project config) | Keywords, glossary, components preloaded | **Hybrid:** Project context (CAG) + similar atoms (RAG) = best extraction quality. See: @references/rag.md for detailed comparison. - **ADR-003:** AI Importance Scoring — LLM Judge vs Heuristics (LLM chosen) - **ADR-006:** Pydantic AI vs LangChain — Hexagonal architecture (Pydantic AI chosen) - @references/architecture.md — Hexagonal LLM domain structure - @references/pydantic-ai.md — Agent configuration, streaming - @references/rag.md — RAG & CAG context strategies