--- name: add-outbox-pattern description: Add transactional outbox pattern for reliable event publishing with RavenDB (project) --- # Add Outbox Pattern Skill Implement the transactional outbox pattern for reliable event publishing in NovaTune using RavenDB. ## Overview The outbox pattern ensures exactly-once event publishing by: 1. Writing events to an `OutboxMessages` collection in the same transaction as domain changes 2. A background processor reads and publishes events, then marks them as processed 3. Guarantees no lost events even if Kafka/Redpanda is temporarily unavailable ## Steps ### 1. Create OutboxMessage Model Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Models/OutboxMessage.cs` ```csharp namespace NovaTuneApp.ApiService.Models; /// /// Represents an event pending publication to the message broker. /// public sealed class OutboxMessage { /// /// RavenDB document ID (e.g., "OutboxMessages/01HXK...") /// public string Id { get; init; } = string.Empty; /// /// Event type name for deserialization/routing. /// public required string EventType { get; init; } /// /// JSON-serialized event payload. /// public required string Payload { get; init; } /// /// Kafka partition key for ordering guarantees. /// public required string PartitionKey { get; init; } /// /// Target topic name (without prefix). /// public string? Topic { get; init; } /// /// When the outbox message was created. /// public required DateTimeOffset CreatedAt { get; init; } /// /// When the message was published (null if pending). /// public DateTimeOffset? ProcessedAt { get; set; } /// /// Number of publication attempts. /// public int Attempts { get; set; } /// /// Last error message if publication failed. /// public string? LastError { get; set; } } ``` ### 2. Create RavenDB Index for Pending Messages Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Infrastructure/Indexes/OutboxMessages_ByPending.cs` ```csharp using Raven.Client.Documents.Indexes; using NovaTuneApp.ApiService.Models; namespace NovaTuneApp.ApiService.Infrastructure.Indexes; public class OutboxMessages_ByPending : AbstractIndexCreationTask { public OutboxMessages_ByPending() { Map = messages => from msg in messages where msg.ProcessedAt == null select new { msg.CreatedAt, msg.Attempts }; } } ``` ### 3. Create Outbox Service Interface Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Services/IOutboxService.cs` ```csharp namespace NovaTuneApp.ApiService.Services; /// /// Service for writing events to the outbox. /// public interface IOutboxService { /// /// Writes an event to the outbox within the current session. /// Must be called before SaveChangesAsync(). /// Task WriteAsync( TEvent @event, string partitionKey, string? topic = null, CancellationToken ct = default) where TEvent : class; } ``` ### 4. Implement Outbox Service Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Services/OutboxService.cs` ```csharp using System.Text.Json; using Raven.Client.Documents.Session; using NovaTuneApp.ApiService.Models; namespace NovaTuneApp.ApiService.Services; public class OutboxService : IOutboxService { private readonly IAsyncDocumentSession _session; private readonly ILogger _logger; public OutboxService( IAsyncDocumentSession session, ILogger logger) { _session = session; _logger = logger; } public async Task WriteAsync( TEvent @event, string partitionKey, string? topic = null, CancellationToken ct = default) where TEvent : class { var eventType = typeof(TEvent).Name; var outboxMessage = new OutboxMessage { Id = $"OutboxMessages/{Ulid.NewUlid()}", EventType = eventType, Payload = JsonSerializer.Serialize(@event), PartitionKey = partitionKey, Topic = topic, CreatedAt = DateTimeOffset.UtcNow }; await _session.StoreAsync(outboxMessage, ct); _logger.LogDebug( "Queued {EventType} for partition {PartitionKey}", eventType, partitionKey); } } ``` ### 5. Create Outbox Processor Background Service Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Infrastructure/Services/OutboxProcessorService.cs` ```csharp using System.Text.Json; using KafkaFlow.Producers; using Microsoft.Extensions.Options; using Raven.Client.Documents; using Raven.Client.Documents.Session; using NovaTuneApp.ApiService.Configuration; using NovaTuneApp.ApiService.Models; using NovaTuneApp.ApiService.Infrastructure.Indexes; namespace NovaTuneApp.ApiService.Infrastructure.Services; public class OutboxProcessorService : BackgroundService { private readonly IServiceProvider _serviceProvider; private readonly IOptions _options; private readonly IOptions _novatuneOptions; private readonly ILogger _logger; public OutboxProcessorService( IServiceProvider serviceProvider, IOptions options, IOptions novatuneOptions, ILogger logger) { _serviceProvider = serviceProvider; _options = options; _novatuneOptions = novatuneOptions; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { if (!_options.Value.Enabled) { _logger.LogInformation("Outbox processor is disabled"); return; } _logger.LogInformation( "Outbox processor starting with {Interval} interval", _options.Value.PollingInterval); while (!stoppingToken.IsCancellationRequested) { try { await ProcessBatchAsync(stoppingToken); } catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested) { break; } catch (Exception ex) { _logger.LogError(ex, "Error processing outbox"); } await Task.Delay(_options.Value.PollingInterval, stoppingToken); } } private async Task ProcessBatchAsync(CancellationToken ct) { using var scope = _serviceProvider.CreateScope(); var store = scope.ServiceProvider.GetRequiredService(); var producerAccessor = scope.ServiceProvider.GetRequiredService(); using var session = store.OpenAsyncSession(); var pendingMessages = await session .Query() .Where(m => m.ProcessedAt == null && m.Attempts < _options.Value.MaxAttempts) .OrderBy(m => m.CreatedAt) .Take(_options.Value.BatchSize) .ToListAsync(ct); if (pendingMessages.Count == 0) return; _logger.LogDebug("Processing {Count} outbox messages", pendingMessages.Count); var topicPrefix = _novatuneOptions.Value.TopicPrefix; foreach (var message in pendingMessages) { try { var topic = message.Topic ?? GetDefaultTopic(message.EventType); var fullTopic = $"{topicPrefix}-{topic}"; var producer = producerAccessor.GetProducer("default"); await producer.ProduceAsync( fullTopic, message.PartitionKey, message.Payload); message.ProcessedAt = DateTimeOffset.UtcNow; _logger.LogDebug( "Published {EventType} to {Topic}", message.EventType, fullTopic); } catch (Exception ex) { message.Attempts++; message.LastError = ex.Message; _logger.LogWarning( ex, "Failed to publish {EventType} (attempt {Attempt})", message.EventType, message.Attempts); } } await session.SaveChangesAsync(ct); } private static string GetDefaultTopic(string eventType) => eventType switch { nameof(TrackDeletedEvent) => "track-deletions", nameof(AudioUploadedEvent) => "audio-events", _ => "events" }; } ``` ### 6. Add Configuration Options Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Configuration/OutboxOptions.cs` ```csharp namespace NovaTuneApp.ApiService.Configuration; public class OutboxOptions { public const string SectionName = "Outbox"; /// /// Polling interval for outbox processor. /// Default: 1 second. /// public TimeSpan PollingInterval { get; set; } = TimeSpan.FromSeconds(1); /// /// Maximum messages per batch. /// Default: 100. /// public int BatchSize { get; set; } = 100; /// /// Maximum publication attempts before giving up. /// Default: 5. /// public int MaxAttempts { get; set; } = 5; /// /// Whether outbox processing is enabled. /// Default: true. /// public bool Enabled { get; set; } = true; /// /// Retention period for processed messages. /// Default: 7 days. /// public TimeSpan RetentionPeriod { get; set; } = TimeSpan.FromDays(7); } ``` ### 7. Register Services in Program.cs ```csharp // Configuration builder.Services.Configure( builder.Configuration.GetSection(OutboxOptions.SectionName)); // Services builder.Services.AddScoped(); // Background processor builder.Services.AddHostedService(); ``` ### 8. Add Configuration to appsettings.json ```json { "Outbox": { "PollingInterval": "00:00:01", "BatchSize": 100, "MaxAttempts": 5, "Enabled": true, "RetentionPeriod": "7.00:00:00" } } ``` ## Usage Example ```csharp public class TrackManagementService : ITrackManagementService { private readonly IAsyncDocumentSession _session; private readonly IOutboxService _outboxService; public async Task DeleteTrackAsync(string trackId, string userId, CancellationToken ct) { var track = await _session.LoadAsync($"Tracks/{trackId}", ct); // ... validation ... // Soft-delete track track.Status = TrackStatus.Deleted; track.DeletedAt = DateTimeOffset.UtcNow; track.ScheduledDeletionAt = track.DeletedAt.Value.AddDays(30); // Write event to outbox (same transaction) var evt = new TrackDeletedEvent { TrackId = trackId, UserId = userId, ObjectKey = track.ObjectKey, // ... other fields }; await _outboxService.WriteAsync(evt, partitionKey: trackId, ct: ct); // Both track update and outbox message saved atomically await _session.SaveChangesAsync(ct); } } ``` ## Benefits - **Exactly-once delivery**: Events stored atomically with domain changes - **Resilience**: Events published even if broker temporarily unavailable - **Ordering**: Partition key ensures order within entity - **Retries**: Failed messages retried with exponential backoff - **Observability**: Failed messages visible in RavenDB ## Cleanup Add a scheduled task to delete processed messages older than retention period: ```csharp // In OutboxProcessorService or separate cleanup service var cutoff = DateTimeOffset.UtcNow - _options.Value.RetentionPeriod; var oldMessages = await session .Query() .Where(m => m.ProcessedAt != null && m.ProcessedAt < cutoff) .Take(1000) .ToListAsync(ct); foreach (var msg in oldMessages) session.Delete(msg); await session.SaveChangesAsync(ct); ```