---
description: Add KafkaFlow consumer handlers for processing Kafka/Redpanda messages (project)
---
# Add Kafka Consumer Skill
Add KafkaFlow consumer handlers for processing Kafka/Redpanda messages in NovaTune.
## Project Context
- Handlers location: `src/NovaTuneApp/NovaTuneApp.Workers.{Name}/Handlers/`
- Message types: `src/NovaTuneApp/NovaTuneApp.ApiService/Infrastructure/Messaging/Messages/`
- Topic naming: `{prefix}-{topic-name}` (e.g., `dev-track-deletions`)
## Steps
### 1. Create Message Type (if needed)
Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Infrastructure/Messaging/Messages/{EventName}.cs`
```csharp
namespace NovaTuneApp.ApiService.Infrastructure.Messaging.Messages;
///
/// Event published when a track is soft-deleted.
///
public record TrackDeletedEvent
{
public int SchemaVersion { get; init; } = 2;
public required string TrackId { get; init; }
public required string UserId { get; init; }
public required string ObjectKey { get; init; }
public string? WaveformObjectKey { get; init; }
public required long FileSizeBytes { get; init; }
public required DateTimeOffset DeletedAt { get; init; }
public required DateTimeOffset ScheduledDeletionAt { get; init; }
public required string CorrelationId { get; init; }
public required DateTimeOffset Timestamp { get; init; }
}
```
### 2. Create Handler Class
Location: `src/NovaTuneApp/NovaTuneApp.Workers.{Name}/Handlers/{EventName}Handler.cs`
```csharp
using KafkaFlow;
using NovaTuneApp.ApiService.Infrastructure.Messaging.Messages;
namespace NovaTuneApp.Workers.Lifecycle.Handlers;
///
/// Handles TrackDeletedEvent messages for immediate cache invalidation.
///
public class TrackDeletedHandler : IMessageHandler
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger _logger;
public TrackDeletedHandler(
IServiceProvider serviceProvider,
ILogger logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
}
public async Task Handle(IMessageContext context, TrackDeletedEvent message)
{
using var scope = _serviceProvider.CreateScope();
_logger.LogInformation(
"Processing TrackDeletedEvent for track {TrackId}, user {UserId}",
message.TrackId,
message.UserId);
try
{
// Get services from scoped container
var cacheService = scope.ServiceProvider.GetRequiredService();
// Perform idempotent operations
await cacheService.InvalidateTrackCacheAsync(
message.TrackId,
message.UserId,
context.ConsumerContext.WorkerStopped);
_logger.LogDebug(
"Successfully processed TrackDeletedEvent for {TrackId}, scheduled deletion at {ScheduledAt}",
message.TrackId,
message.ScheduledDeletionAt);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Failed to process TrackDeletedEvent for track {TrackId}",
message.TrackId);
// Re-throw to trigger retry/DLQ behavior
throw;
}
}
}
```
### 3. Register Consumer in Program.cs
```csharp
var topicPrefix = builder.Configuration["NovaTune:TopicPrefix"] ?? "dev";
var bootstrapServers = builder.Configuration.GetConnectionString("messaging")
?? "localhost:9092";
builder.Services.AddKafka(kafka => kafka
.UseMicrosoftLog()
.AddCluster(cluster =>
{
cluster.WithBrokers([bootstrapServers]);
// Register consumer for track deletions
cluster.AddConsumer(consumer => consumer
.Topic($"{topicPrefix}-track-deletions")
.WithGroupId($"{topicPrefix}-lifecycle-worker")
.WithBufferSize(100)
.WithWorkersCount(2)
.WithAutoOffsetReset(KafkaFlow.AutoOffsetReset.Earliest)
.WithConsumerConfig(new ConsumerConfig
{
SessionTimeoutMs = 45000,
SocketTimeoutMs = 30000,
ReconnectBackoffMs = 1000
})
.AddMiddlewares(m => m
.AddDeserializer()
.AddTypedHandlers(h => h.AddHandler())
)
);
})
);
// Register handler in DI
builder.Services.AddTransient();
```
### 4. Add KafkaFlow Hosted Service
```csharp
builder.Services.AddHostedService();
```
The hosted service manages the KafkaFlow bus lifecycle:
```csharp
internal class KafkaFlowHostedService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger _logger;
private IKafkaBus? _kafkaBus;
private const int MaxRetries = 30;
private static readonly TimeSpan RetryDelay = TimeSpan.FromSeconds(2);
public KafkaFlowHostedService(
IServiceProvider serviceProvider,
ILogger logger)
{
_serviceProvider = serviceProvider;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Starting KafkaFlow bus...");
await Task.Delay(TimeSpan.FromSeconds(2), stoppingToken);
for (var attempt = 1; attempt <= MaxRetries; attempt++)
{
try
{
_kafkaBus = _serviceProvider.CreateKafkaBus();
await _kafkaBus.StartAsync(stoppingToken);
_logger.LogInformation("KafkaFlow bus started on attempt {Attempt}", attempt);
await Task.Delay(Timeout.Infinite, stoppingToken);
return;
}
catch (OperationCanceledException)
{
_logger.LogInformation("KafkaFlow bus stopping due to cancellation");
return;
}
catch (Exception ex)
{
_logger.LogWarning(ex,
"Failed to start KafkaFlow bus (attempt {Attempt}/{Max})",
attempt, MaxRetries);
if (attempt < MaxRetries)
await Task.Delay(RetryDelay, stoppingToken);
else
_logger.LogError(ex, "Failed after {Max} attempts", MaxRetries);
}
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
if (_kafkaBus is not null)
{
_logger.LogInformation("Stopping KafkaFlow bus...");
await _kafkaBus.StopAsync();
}
await base.StopAsync(cancellationToken);
}
}
```
## Consumer Configuration Options
| Option | Description | Default |
|--------|-------------|---------|
| `WithBufferSize` | Internal message buffer size | 100 |
| `WithWorkersCount` | Parallel message processors | 1-4 |
| `WithAutoOffsetReset` | Starting position for new consumers | `Earliest` |
| `SessionTimeoutMs` | Consumer session timeout | 45000 |
| `SocketTimeoutMs` | Socket timeout | 30000 |
## Handler Patterns
### Simple Handler
```csharp
public class SimpleHandler : IMessageHandler
{
public Task Handle(IMessageContext context, MyEvent message)
{
// Process message
return Task.CompletedTask;
}
}
```
### Handler with Scoped Services
```csharp
public class ScopedHandler : IMessageHandler
{
private readonly IServiceProvider _serviceProvider;
public ScopedHandler(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public async Task Handle(IMessageContext context, MyEvent message)
{
using var scope = _serviceProvider.CreateScope();
var dbSession = scope.ServiceProvider.GetRequiredService();
// Use scoped services
await dbSession.SaveChangesAsync(context.ConsumerContext.WorkerStopped);
}
}
```
### Handler with Retry/DLQ
```csharp
public async Task Handle(IMessageContext context, MyEvent message)
{
try
{
// Process message
}
catch (TransientException ex)
{
// Will be retried based on consumer config
throw;
}
catch (PermanentException ex)
{
// Log and swallow - don't retry
_logger.LogError(ex, "Permanent failure for message");
}
}
```
## Best Practices
1. **Make handlers idempotent** - Messages may be delivered more than once
2. **Use scoped services** - Create scope for each message
3. **Handle cancellation** - Use `context.ConsumerContext.WorkerStopped`
4. **Log appropriately** - Info for processing, Debug for success, Error for failures
5. **Re-throw for retries** - Only swallow permanent failures
6. **Keep handlers focused** - One handler per message type
## Testing
```csharp
[Fact]
public async Task Handler_Should_InvalidateCache_OnTrackDeleted()
{
// Arrange
var cacheService = Substitute.For();
var serviceProvider = BuildServiceProvider(cacheService);
var handler = new TrackDeletedHandler(serviceProvider, _logger);
var message = new TrackDeletedEvent
{
TrackId = "01HXK...",
UserId = "user123",
ObjectKey = "tracks/01HXK...",
FileSizeBytes = 1024,
DeletedAt = DateTimeOffset.UtcNow,
ScheduledDeletionAt = DateTimeOffset.UtcNow.AddDays(30),
CorrelationId = Guid.NewGuid().ToString(),
Timestamp = DateTimeOffset.UtcNow
};
// Act
await handler.Handle(_mockContext, message);
// Assert
await cacheService.Received(1)
.InvalidateTrackCacheAsync(message.TrackId, message.UserId, Arg.Any());
}
```
## Topic Naming Convention
| Topic | Purpose | Producer | Consumer |
|-------|---------|----------|----------|
| `{prefix}-audio-events` | Audio upload notifications | Upload flow | Audio processor |
| `{prefix}-track-deletions` | Track deletion events | API service | Lifecycle worker |
| `{prefix}-minio-events` | MinIO bucket events | MinIO | Upload ingestor |