apiVersion: score.dev/v1b1 metadata: name: data-pipeline-worker annotations: app.kubernetes.io/component: worker app.kubernetes.io/part-of: data-platform processing.company.io/queue: high-priority monitoring.company.io/alerts: critical service: ports: metrics: port: 9090 targetPort: 9090 protocol: TCP health: port: 8081 targetPort: 8081 protocol: TCP containers: worker: image: 123456789012.dkr.ecr.us-west-2.amazonaws.com/data-worker:3.1.0 command: - /app/worker args: - --config=/etc/worker/config.yaml - --workers=4 - --batch-size=1000 - --log-format=json variables: WORKER_ID: ${metadata.name} KAFKA_BROKERS: ${resources.kafka-cluster.brokers} KAFKA_TOPIC_INPUT: raw-events KAFKA_TOPIC_OUTPUT: processed-events KAFKA_CONSUMER_GROUP: data-pipeline-workers SCHEMA_REGISTRY_URL: ${resources.schema-registry.url} MONGODB_URI: ${resources.timeseries-db.uri} MONGODB_DATABASE: analytics S3_BUCKET: ${resources.data-lake.bucket} S3_REGION: ${resources.data-lake.region} AWS_ACCESS_KEY_ID: ${resources.aws-credentials.access_key_id} AWS_SECRET_ACCESS_KEY: ${resources.aws-credentials.secret_access_key} CHECKPOINT_INTERVAL_MS: "60000" MAX_POLL_RECORDS: "500" PROCESSING_TIMEOUT_MS: "30000" DEAD_LETTER_TOPIC: failed-events METRICS_PORT: "9090" HEALTH_PORT: "8081" OTEL_EXPORTER_OTLP_ENDPOINT: ${resources.observability.otlp_endpoint} OTEL_SERVICE_NAME: data-pipeline-worker files: /etc/worker/config.yaml: content: | worker: name: ${metadata.name} parallelism: 4 batchSize: 1000 flushInterval: 5s kafka: consumer: autoOffsetReset: earliest enableAutoCommit: false maxPollIntervalMs: 300000 producer: acks: all retries: 3 compressionType: snappy processing: transforms: - name: parse-json type: json-parser - name: enrich-geo type: geo-lookup config: database: /data/geoip/GeoLite2-City.mmdb - name: anonymize-pii type: pii-filter config: fields: - email - phone - ip_address - name: aggregate-metrics type: aggregator config: window: 1m groupBy: - country - device_type output: formats: - parquet - json partitioning: type: time granularity: hourly mode: "0644" /etc/worker/schemas/event.avsc: source: ./schemas/event.avsc mode: "0644" noExpand: true /etc/worker/schemas/output.avsc: source: ./schemas/output.avsc mode: "0644" noExpand: true volumes: /data/geoip: source: ${resources.geoip-database.source} readOnly: true /data/checkpoints: source: ${resources.checkpoint-volume.source} readOnly: false /data/tmp: source: ${resources.scratch-volume.source} path: worker-tmp readOnly: false resources: requests: memory: 2Gi cpu: "1" limits: memory: 8Gi cpu: "4" readinessProbe: httpGet: path: /ready port: 8081 scheme: HTTP livenessProbe: exec: command: - /app/healthcheck - --check=kafka-connection - --check=mongodb-connection - --timeout=10s metrics-exporter: image: prom/statsd-exporter:v0.24.0 args: - --statsd.listen-udp=:9125 - --web.listen-address=:9102 - --statsd.mapping-config=/etc/statsd/mapping.yaml files: /etc/statsd/mapping.yaml: content: | mappings: - match: "worker.*.processing.*" name: "worker_processing" labels: worker_id: "$1" metric: "$2" - match: "worker.*.kafka.*" name: "worker_kafka" labels: worker_id: "$1" operation: "$2" - match: "worker.*.errors.*" name: "worker_errors_total" labels: worker_id: "$1" error_type: "$2" mode: "0644" resources: requests: memory: 32M cpu: 25m limits: memory: 64M cpu: 100m readinessProbe: httpGet: path: /metrics port: 9102 scheme: HTTP resources: kafka-cluster: type: kafka class: managed id: data-platform.events metadata: annotations: kafka.company.io/retention: "7d" kafka.company.io/partitions: "24" params: version: "3.5" securityProtocol: SASL_SSL saslMechanism: SCRAM-SHA-512 schema-registry: type: service id: data-platform.schema-registry params: compatibilityLevel: BACKWARD timeseries-db: type: mongodb class: timeseries id: analytics.metrics params: version: "7.0" sharded: true replicaSet: analytics-rs writeConcern: majority data-lake: type: s3 class: data-lake id: analytics.raw-data metadata: annotations: storage.company.io/tier: intelligent-tiering params: versioning: false lifecycle: - prefix: raw/ transitionDays: 30 storageClass: GLACIER - prefix: processed/ expirationDays: 365 aws-credentials: type: secret class: aws-iam id: data-platform.s3-access params: policy: s3-read-write geoip-database: type: volume class: configmap id: shared.geoip params: updateSchedule: weekly checkpoint-volume: type: volume class: ssd params: size: 100Gi accessMode: ReadWriteOnce scratch-volume: type: volume class: ephemeral params: size: 50Gi observability: type: service id: shared.observability params: traces: true metrics: true logs: true