--- name: pipeline-assistant description: This skill should be used when users need to create or fix Redpanda Connect pipeline configurations. Trigger when users mention "config", "pipeline", "YAML", "create a config", "fix my config", "validate my pipeline", or describe a streaming pipeline need like "read from Kafka and write to S3". --- # Redpanda Connect Configuration Assistant Create working, validated Redpanda Connect configurations from scratch or repair existing configurations that have issues. **This skill REQUIRES skills: `component-search`, `bloblang-authoring`.** ## Objective Deliver a complete, valid YAML configuration that passes validation and meets the user's requirements. Whether starting from a description or fixing a broken config, the result must be production-ready with properly secured credentials. Handle Two Scenarios: **Creation** - User provides description like "Read from Kafka on localhost:9092 topic 'events' to stdout" **Repair** - User provides config file path and optional error context This skill focuses ONLY on pipeline configuration orchestration and validation. **Skill Delegation**: NEVER directly use component-search or bloblang-authoring tools. - **Component Discovery** - ALWAYS delegate to `component-search` skill when it is unclear which components to use OR when you need component configuration details - **Bloblang Development** - ALWAYS delegate to `bloblang-authoring` skill when creating or fixing Bloblang transformations and NEVER write Bloblang yourself ## Setup This skill requires: `rpk`, `rpk connect`. See the [SETUP](SETUP.md) for installation instructions. ## Tools ### Scaffold Pipeline Generates YAML configuration template from component expression. Useful for quickly creating first pipeline draft. ```bash # Usage: rpk connect create [--small] ,...[/,...]/,... # Examples: rpk connect create stdin/bloblang,awk/nats rpk connect create file,http_server/protobuf/http_client # Multiple inputs rpk connect create kafka_franz/stdout # Only input and output, no processors rpk connect create --small stdin/bloblang/stdout # Minimal config, omit advanced fields ``` - Requires component expression specifying desired inputs, processors, and outputs - Expression format: `inputs/processors/outputs` separated by `/` - Multiple components of same type separated by `,` - Outputs complete YAML configuration with specified components - `--small` flag omits advanced fields ### Online Component Documentation Use the `component-search` skill's `Online Component Documentation` tool to look up detailed configuration information for any Redpanda Connect component containing usage examples, field descriptions, and best practices. ### Lint Pipeline Validates Redpanda Connect pipeline configurations. ```bash # Usage: rpk connect lint [--env-file <.env>] # Examples: rpk connect lint --env-file ./.env ./pipeline.yaml rpk connect lint pipeline-without-secrets.yaml ``` - Requires pipeline configuration file path (e.g., `pipeline.yaml`) - Optional `--env-file` flag provides `.env` file for environment variable substitution - Validates YAML syntax, component configurations, and Bloblang expressions - Outputs detailed error messages with specific location information - Exit code `0` indicates success, non-zero indicates validation failures - Can be run repeatedly during pipeline development and iteration ### Run Pipeline Executes Redpanda Connect pipeline to test end-to-end functionality. ```bash # Usage: rpk connect run [--log.level DEBUG] --env-file <.env> # Examples: rpk connect run pipeline-without-secrets.yaml rpk connect run --env-file ./.env ./pipeline.yaml # With secrets rpk connect run --log.level DEBUG --env-file ./.env ./pipeline.yaml # With debug logging ``` - Requires pipeline configuration file path (e.g., `pipeline.yaml`) - Optional `--env-file` flag provides dotenv file for environment variable substitution - Optional `--log.level DEBUG` enables detailed logging for troubleshooting connection and processing issues - Starts pipeline and maintains active connections to inputs and outputs - Runs continuously until manually terminated with Ctrl+C (SIGINT) - Can be run repeatedly during pipeline development and iteration ### Test with Standard Input/Output Test pipeline logic with `stdin`/`stdout` before connecting to real systems. Especially useful for validating routing logic, error handling, and transformations. **Example: Content-based routing** ```yaml input: stdin: {} pipeline: processors: - mapping: | root = this # Route based on message type if this.type == "error" { meta route = "dlq" } else if this.priority == "high" { meta route = "urgent" } else { meta route = "standard" } output: switch: cases: - check: 'meta("route") == "dlq"' output: stdout: {} processors: - mapping: 'root = "DLQ: " + content().string()' - check: 'meta("route") == "urgent"' output: stdout: {} processors: - mapping: 'root = "URGENT: " + content().string()' - check: 'meta("route") == "standard"' output: stdout: {} processors: - mapping: 'root = "STANDARD: " + content().string()' ``` **Test all routes:** ```bash echo '{"type":"error","msg":"failed"}' | rpk connect run test.yaml # Output: DLQ: {"type":"error","msg":"failed"} echo '{"priority":"high","msg":"urgent"}' | rpk connect run test.yaml # Output: URGENT: {"priority":"high","msg":"urgent"} echo '{"priority":"low","msg":"normal"}' | rpk connect run test.yaml # Output: STANDARD: {"priority":"low","msg":"normal"} ``` **Limitations:** - Stdin/stdout cannot test batching behavior realistically - No connection, retry, or timeout logic validation - Cannot test ordering guarantees or parallel processing - Real integration testing still required before production deployment ## YAML Configuration Structure Top-level keys: - `input` - Data source (required): kafka_franz, http_server, stdin, aws_s3, etc - `output` - Data destination (required): kafka_franz, postgres, stdout, aws_s3, etc - `pipeline.processors` - Transformations (optional, execute sequentially) - `cache_resources`, `rate_limit_resources` - Reusable components (optional) **Environment variables (required for secrets):** ```yaml # Basic reference broker: "${KAFKA_BROKER}" # With default value broker: "${KAFKA_BROKER:localhost:9092}" ``` **Field type conventions:** - Durations: `"30s"`, `"5m"`, `"1h"`, `"100ms"` - Sizes: `"5MB"`, `"1GB"`, `"512KB"` - Booleans: `true`, `false` (no quotes) **Minimal example:** ```yaml input: redpanda: seed_brokers: ["${KAFKA_BROKER}"] topics: ["${TOPIC}"] pipeline: processors: - mapping: | # Bloblang transformation - use bloblang-authoring skill to create root = this root.timestamp = now() output: stdout: {} ``` Use `Quick Pipeline Scaffolding` for initial drafts. ### Production Recipes/Patterns The `./resources/recipes/` directory contains validated production patterns. Each recipe includes: - **Markdown documentation** (`.md`) - Pattern explanation, configuration details, testing instructions, and variations - **Working YAML configuration** (`.yaml`) - Complete, tested pipeline referenced in the markdown **Before writing pipelines:** 1. **Read component documentation** - Use `Online Component Documentation` tool for detailed field info and examples 2. **Read relevant recipes** - When user describes a pattern matching a recipe (routing, DLQ, replication, etc.), read the markdown file first 3. **Adapt, don't copy** - Use recipes as reference for patterns and best practices, customize for user's specific requirements #### Available Recipes **Error Handling** - `dlq-basic.md` - Dead letter queue for error handling **Routing** - `content-based-router.md` - Route messages by field values - `multicast.md` - Fan-out to multiple destinations **Replication** - `kafka-replication.md` - Cross-cluster Kafka streaming - `cdc-replication.md` - Database change data capture **Cloud Storage** - `s3-sink-basic.md` - S3 output with batching - `s3-sink-time-based.md` - Time-partitioned S3 writes - `s3-polling.md` - Poll S3 for new files **Stateful Processing** - `stateful-counter.md` - Stateful counting with cache - `window-aggregation.md` - Time-window aggregations **Performance & Monitoring** - `rate-limiting.md` - Throughput control - `custom-metrics.md` - Prometheus metrics ## Workflow ### Creating New Configurations 1. **Understand requirements** - Parse description for source, destination, transformations, and special needs (ordering, batching, etc.) - Ask clarifying questions for ambiguous aspects - Check `./resources/recipes/` for relevant patterns 2. **Discover components** - Use `component-search` skill if unclear which components to use - Read component documentation for configuration details 3. **Build configuration** - Generate scaffold with `rpk connect create input/processor/output` - Add all required fields from component schemas - For secrets: ask user for env var names → use `${VAR_NAME}` → document in `.env.example` - Keep configuration minimal and simple 4. **Add transformations** (if needed) - Delegate to `bloblang-authoring` skill for tested scripts - Embed in `pipeline.processors` section 5. **Validate and iterate** - Run `rpk connect lint` - On errors: parse → fix → re-validate until clean - Iterate until validation passes 6. **Test and iterate** - Test with `rpk connect run` - Temporarily use `stdin` and `stdout` for easier testing - Run with `rpk connect run` - Fix any runtime issues - Test all edge cases - Iterate until tests pass - Test connection and authentication to real systems if possible 7. **Deliver** - Deliver final `pipeline.yaml` and `.env.example` - Explain component choices and configuration decisions - Create concise `TESTING.md` with only practical followup testing instructions: - How to set up environment - Command to run the pipeline - Sample curl/test commands with realistic data - How to verify results in the target system - ONLY include new/essential information, avoid verbose explanations - NEVER create README files - Show concise summary in chat response ### Repairing Existing Configurations 1. **Diagnose** - Run `rpk connect lint` to identify errors - Review user-provided context about symptoms - Find root causes (typos, deprecations, type mismatches) 2. **Explain issues** - Translate validation errors to plain language - Explain why current configuration doesn't work - Identify root causes, not just symptoms 3. **Fix minimally** - Get user approval before modifying files - Preserve original structure, comments, and intent - Replace deprecated components if needed - Apply secret handling with environment variables 4. **Verify** - Re-validate after each change - Test modified Bloblang transformations - Confirm no regressions introduced ### Security Requirements (Critical) **Never store credentials in plain text:** - All passwords, secrets, tokens, API keys MUST use `${ENV_VAR}` syntax in YAML - Never put actual credentials in YAML or conversation **Environment variable files:** - `.env` - Contains actual secret values, used at runtime with `--env-file .env`, NEVER commit to git - `.env.example` - Documents required variables with placeholder values, safe to commit - Always remind user to add `.env` to `.gitignore` **When encountering sensitive fields** (from `` in component schema): 1. Ask user for environment variable name (e.g., `KAFKA_PASSWORD`) 2. Write `${KAFKA_PASSWORD}` in YAML configuration 3. Document in `.env.example`: `KAFKA_PASSWORD=your_password_here` 4. User creates actual `.env` with real value: `KAFKA_PASSWORD=actual_secret_123`