---
description: Add transactional outbox pattern for reliable event publishing with RavenDB (project)
---
# Add Outbox Pattern Skill
Implement the transactional outbox pattern for reliable event publishing in NovaTune using RavenDB.
## Overview
The outbox pattern ensures exactly-once event publishing by:
1. Writing events to an `OutboxMessages` collection in the same transaction as domain changes
2. A background processor reads and publishes events, then marks them as processed
3. Guarantees no lost events even if Kafka/Redpanda is temporarily unavailable
## Steps
### 1. Create OutboxMessage Model
Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Models/OutboxMessage.cs`
```csharp
namespace NovaTuneApp.ApiService.Models;
///
/// Represents an event pending publication to the message broker.
///
public sealed class OutboxMessage
{
///
/// RavenDB document ID (e.g., "OutboxMessages/01HXK...")
///
public string Id { get; init; } = string.Empty;
///
/// Event type name for deserialization/routing.
///
public required string EventType { get; init; }
///
/// JSON-serialized event payload.
///
public required string Payload { get; init; }
///
/// Kafka partition key for ordering guarantees.
///
public required string PartitionKey { get; init; }
///
/// Target topic name (without prefix).
///
public string? Topic { get; init; }
///
/// When the outbox message was created.
///
public required DateTimeOffset CreatedAt { get; init; }
///
/// When the message was published (null if pending).
///
public DateTimeOffset? ProcessedAt { get; set; }
///
/// Number of publication attempts.
///
public int Attempts { get; set; }
///
/// Last error message if publication failed.
///
public string? LastError { get; set; }
}
```
### 2. Create RavenDB Index for Pending Messages
Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Infrastructure/Indexes/OutboxMessages_ByPending.cs`
```csharp
using Raven.Client.Documents.Indexes;
using NovaTuneApp.ApiService.Models;
namespace NovaTuneApp.ApiService.Infrastructure.Indexes;
public class OutboxMessages_ByPending : AbstractIndexCreationTask
{
public OutboxMessages_ByPending()
{
Map = messages => from msg in messages
where msg.ProcessedAt == null
select new
{
msg.CreatedAt,
msg.Attempts
};
}
}
```
### 3. Create Outbox Service Interface
Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Services/IOutboxService.cs`
```csharp
namespace NovaTuneApp.ApiService.Services;
///
/// Service for writing events to the outbox.
///
public interface IOutboxService
{
///
/// Writes an event to the outbox within the current session.
/// Must be called before SaveChangesAsync().
///
Task WriteAsync(
TEvent @event,
string partitionKey,
string? topic = null,
CancellationToken ct = default) where TEvent : class;
}
```
### 4. Implement Outbox Service
Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Services/OutboxService.cs`
```csharp
using System.Text.Json;
using Raven.Client.Documents.Session;
using NovaTuneApp.ApiService.Models;
namespace NovaTuneApp.ApiService.Services;
public class OutboxService : IOutboxService
{
private readonly IAsyncDocumentSession _session;
private readonly ILogger _logger;
public OutboxService(
IAsyncDocumentSession session,
ILogger logger)
{
_session = session;
_logger = logger;
}
public async Task WriteAsync(
TEvent @event,
string partitionKey,
string? topic = null,
CancellationToken ct = default) where TEvent : class
{
var eventType = typeof(TEvent).Name;
var outboxMessage = new OutboxMessage
{
Id = $"OutboxMessages/{Ulid.NewUlid()}",
EventType = eventType,
Payload = JsonSerializer.Serialize(@event),
PartitionKey = partitionKey,
Topic = topic,
CreatedAt = DateTimeOffset.UtcNow
};
await _session.StoreAsync(outboxMessage, ct);
_logger.LogDebug(
"Queued {EventType} for partition {PartitionKey}",
eventType, partitionKey);
}
}
```
### 5. Create Outbox Processor Background Service
Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Infrastructure/Services/OutboxProcessorService.cs`
```csharp
using System.Text.Json;
using KafkaFlow.Producers;
using Microsoft.Extensions.Options;
using Raven.Client.Documents;
using Raven.Client.Documents.Session;
using NovaTuneApp.ApiService.Configuration;
using NovaTuneApp.ApiService.Models;
using NovaTuneApp.ApiService.Infrastructure.Indexes;
namespace NovaTuneApp.ApiService.Infrastructure.Services;
public class OutboxProcessorService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly IOptions _options;
private readonly IOptions _novatuneOptions;
private readonly ILogger _logger;
public OutboxProcessorService(
IServiceProvider serviceProvider,
IOptions options,
IOptions novatuneOptions,
ILogger logger)
{
_serviceProvider = serviceProvider;
_options = options;
_novatuneOptions = novatuneOptions;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
if (!_options.Value.Enabled)
{
_logger.LogInformation("Outbox processor is disabled");
return;
}
_logger.LogInformation(
"Outbox processor starting with {Interval} interval",
_options.Value.PollingInterval);
while (!stoppingToken.IsCancellationRequested)
{
try
{
await ProcessBatchAsync(stoppingToken);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing outbox");
}
await Task.Delay(_options.Value.PollingInterval, stoppingToken);
}
}
private async Task ProcessBatchAsync(CancellationToken ct)
{
using var scope = _serviceProvider.CreateScope();
var store = scope.ServiceProvider.GetRequiredService();
var producerAccessor = scope.ServiceProvider.GetRequiredService();
using var session = store.OpenAsyncSession();
var pendingMessages = await session
.Query()
.Where(m => m.ProcessedAt == null && m.Attempts < _options.Value.MaxAttempts)
.OrderBy(m => m.CreatedAt)
.Take(_options.Value.BatchSize)
.ToListAsync(ct);
if (pendingMessages.Count == 0) return;
_logger.LogDebug("Processing {Count} outbox messages", pendingMessages.Count);
var topicPrefix = _novatuneOptions.Value.TopicPrefix;
foreach (var message in pendingMessages)
{
try
{
var topic = message.Topic ?? GetDefaultTopic(message.EventType);
var fullTopic = $"{topicPrefix}-{topic}";
var producer = producerAccessor.GetProducer("default");
await producer.ProduceAsync(
fullTopic,
message.PartitionKey,
message.Payload);
message.ProcessedAt = DateTimeOffset.UtcNow;
_logger.LogDebug(
"Published {EventType} to {Topic}",
message.EventType, fullTopic);
}
catch (Exception ex)
{
message.Attempts++;
message.LastError = ex.Message;
_logger.LogWarning(
ex,
"Failed to publish {EventType} (attempt {Attempt})",
message.EventType, message.Attempts);
}
}
await session.SaveChangesAsync(ct);
}
private static string GetDefaultTopic(string eventType) => eventType switch
{
nameof(TrackDeletedEvent) => "track-deletions",
nameof(AudioUploadedEvent) => "audio-events",
_ => "events"
};
}
```
### 6. Add Configuration Options
Location: `src/NovaTuneApp/NovaTuneApp.ApiService/Configuration/OutboxOptions.cs`
```csharp
namespace NovaTuneApp.ApiService.Configuration;
public class OutboxOptions
{
public const string SectionName = "Outbox";
///
/// Polling interval for outbox processor.
/// Default: 1 second.
///
public TimeSpan PollingInterval { get; set; } = TimeSpan.FromSeconds(1);
///
/// Maximum messages per batch.
/// Default: 100.
///
public int BatchSize { get; set; } = 100;
///
/// Maximum publication attempts before giving up.
/// Default: 5.
///
public int MaxAttempts { get; set; } = 5;
///
/// Whether outbox processing is enabled.
/// Default: true.
///
public bool Enabled { get; set; } = true;
///
/// Retention period for processed messages.
/// Default: 7 days.
///
public TimeSpan RetentionPeriod { get; set; } = TimeSpan.FromDays(7);
}
```
### 7. Register Services in Program.cs
```csharp
// Configuration
builder.Services.Configure(
builder.Configuration.GetSection(OutboxOptions.SectionName));
// Services
builder.Services.AddScoped();
// Background processor
builder.Services.AddHostedService();
```
### 8. Add Configuration to appsettings.json
```json
{
"Outbox": {
"PollingInterval": "00:00:01",
"BatchSize": 100,
"MaxAttempts": 5,
"Enabled": true,
"RetentionPeriod": "7.00:00:00"
}
}
```
## Usage Example
```csharp
public class TrackManagementService : ITrackManagementService
{
private readonly IAsyncDocumentSession _session;
private readonly IOutboxService _outboxService;
public async Task DeleteTrackAsync(string trackId, string userId, CancellationToken ct)
{
var track = await _session.LoadAsync