--- name: dotnet-messaging-patterns description: >- Builds event-driven systems. Pub/sub, competing consumers, DLQ, sagas, delivery guarantees. metadata: short-description: .NET skill guidance for architecture tasks --- # dotnet-messaging-patterns Durable messaging patterns for .NET event-driven architectures. Covers publish/subscribe, competing consumers, dead-letter queues, saga/process manager orchestration, and delivery guarantee strategies using Azure Service Bus, RabbitMQ, and MassTransit. ## Scope - Publish/subscribe and competing consumer patterns - Dead-letter queues and poison message handling - Saga/process manager orchestration - Delivery guarantee strategies (at-least-once, exactly-once) - Azure Service Bus, RabbitMQ, and MassTransit integration ## Out of scope - Background service lifecycle and IHostedService registration -- see [skill:dotnet-background-services] - Resilience pipelines and retry policies -- see [skill:dotnet-resilience] - JSON/binary serialization configuration -- see [skill:dotnet-serialization] - In-process producer/consumer queues with Channel -- see [skill:dotnet-channels] Cross-references: [skill:dotnet-background-services] for hosting message consumers, [skill:dotnet-resilience] for fault tolerance around message handlers, [skill:dotnet-serialization] for message envelope serialization, [skill:dotnet-channels] for in-process queuing patterns. --- ## Messaging Fundamentals ### Message Types | Type | Purpose | Example | | ------------ | ------------------------------------------- | ----------------------------------- | | **Command** | Request an action (one recipient) | `PlaceOrder`, `ShipPackage` | | **Event** | Notify something happened (many recipients) | `OrderPlaced`, `PaymentReceived` | | **Document** | Transfer data between systems | `CustomerProfile`, `ProductCatalog` | Commands are sent to a specific queue; events are published to a topic/exchange and delivered to all subscribers. This distinction drives the choice between point-to-point and pub/sub topologies. ### Delivery Guarantees | Guarantee | Behavior | Implementation | | ----------------- | ------------------------------------------------------- | --------------------------------------- | | **At-most-once** | Fire and forget; message may be lost | No ack, no retry | | **At-least-once** | Message retried until acknowledged; duplicates possible | Ack after processing + retry on failure | | **Exactly-once** | Each message processed exactly once | At-least-once + idempotent consumer | **At-least-once with idempotent consumers** is the standard approach for durable messaging. True exactly-once requires distributed transactions (which most brokers do not support) or consumer-side deduplication. --- ## Publish/Subscribe ### Azure Service Bus Topics ````csharp // Publisher -- send event to a topic await using var client = new ServiceBusClient(connectionString); await using var sender = client.CreateSender("order-events"); var message = new ServiceBusMessage( JsonSerializer.SerializeToUtf8Bytes(new OrderPlaced(orderId, total))) { Subject = nameof(OrderPlaced), ContentType = "application/json", MessageId = Guid.NewGuid().ToString() }; await sender.SendMessageAsync(message, cancellationToken); ```text ```csharp // Subscriber -- process events from a subscription await using var processor = client.CreateProcessor( topicName: "order-events", subscriptionName: "billing-service", new ServiceBusProcessorOptions { MaxConcurrentCalls = 10, AutoCompleteMessages = false }); processor.ProcessMessageAsync += async args => { var body = args.Message.Body.ToObjectFromJson(); await HandleOrderPlacedAsync(body); await args.CompleteMessageAsync(args.Message); }; processor.ProcessErrorAsync += args => { logger.LogError(args.Exception, "Error processing message"); return Task.CompletedTask; }; await processor.StartProcessingAsync(cancellationToken); ```text **Key packages:** ```xml ```xml ### RabbitMQ Fanout Exchange ```csharp // Publisher -- declare exchange and publish var factory = new ConnectionFactory { HostName = "localhost" }; await using var connection = await factory.CreateConnectionAsync(); await using var channel = await connection.CreateChannelAsync(); await channel.ExchangeDeclareAsync( exchange: "order-events", type: ExchangeType.Fanout, durable: true); var body = JsonSerializer.SerializeToUtf8Bytes( new OrderPlaced(orderId, total)); await channel.BasicPublishAsync( exchange: "order-events", routingKey: string.Empty, body: body); ```text **Key packages:** ```xml ```xml ### MassTransit Publish MassTransit abstracts the broker, providing a unified API for Azure Service Bus, RabbitMQ, Amazon SQS, and in-memory transport. ```csharp // Registration builder.Services.AddMassTransit(x => { x.AddConsumer(); x.UsingRabbitMq((context, cfg) => { cfg.Host("localhost", "/", h => { h.Username("guest"); h.Password("guest"); }); cfg.ConfigureEndpoints(context); }); }); // Publisher public sealed class OrderService(IPublishEndpoint publishEndpoint) { public async Task PlaceOrderAsync( Guid orderId, decimal total, CancellationToken ct) { // Process order... await publishEndpoint.Publish( new OrderPlaced(orderId, total), ct); } } // Consumer public sealed class OrderPlacedConsumer( ILogger logger) : IConsumer { public async Task Consume(ConsumeContext context) { logger.LogInformation( "Processing order {OrderId}", context.Message.OrderId); await ProcessAsync(context.Message); } } // Message contract (use records in a shared contracts assembly) public record OrderPlaced(Guid OrderId, decimal Total); ```text **Key packages:** ```xml ```text --- ## Competing Consumers Multiple consumer instances process messages from the same queue in parallel. The broker delivers each message to exactly one consumer, distributing load across instances. ### Pattern ```text Queue: order-processing ├── Consumer Instance A (picks message 1) ├── Consumer Instance B (picks message 2) └── Consumer Instance C (picks message 3) ```text ### Azure Service Bus -- Scaling Consumers ```csharp // Multiple instances reading from the same queue automatically compete. // MaxConcurrentCalls controls per-instance parallelism. var processor = client.CreateProcessor("order-processing", new ServiceBusProcessorOptions { MaxConcurrentCalls = 20, PrefetchCount = 50, AutoCompleteMessages = false }); ```text ### MassTransit -- Concurrency Limits ```csharp x.AddConsumer(cfg => { cfg.UseConcurrentMessageLimit(10); }); ```text ### Ordering Considerations Competing consumers sacrifice strict ordering for throughput. When order matters: - **Azure Service Bus**: Use sessions (`RequiresSession = true`) to guarantee FIFO within a session ID (e.g., per customer) - **RabbitMQ**: Use a single consumer per queue, or consistent-hash exchange to partition by key - **MassTransit**: Configure `UseMessagePartitioner` for key-based ordering --- ## Dead-Letter Queues Dead-letter queues (DLQs) capture messages that cannot be processed after exhausting retries. They prevent poison messages from blocking the main queue. ### Why Messages Are Dead-Lettered | Reason | Trigger | | ------------------------------ | -------------------------------------------- | | Max delivery attempts exceeded | Message failed processing N times | | TTL expired | Message sat in queue past its time-to-live | | Consumer rejection | Consumer explicitly dead-letters the message | | Queue length exceeded | Queue overflow policy routes to DLQ | ### Azure Service Bus DLQ ```csharp // Dead-letter a message with reason await args.DeadLetterMessageAsync( args.Message, deadLetterReason: "ValidationFailed", deadLetterErrorDescription: "Missing required field: CustomerId"); // Read from the dead-letter sub-queue await using var dlqReceiver = client.CreateReceiver( "order-processing", new ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter }); while (true) { var message = await dlqReceiver.ReceiveMessageAsync( TimeSpan.FromSeconds(5), cancellationToken); if (message is null) break; logger.LogWarning( "DLQ message: {Reason} - {Description}", message.DeadLetterReason, message.DeadLetterErrorDescription); // Inspect, fix, and re-submit or discard await dlqReceiver.CompleteMessageAsync(message); } ```text ### MassTransit Error/Fault Queues MassTransit automatically creates `_error` and `_skipped` queues. Failed messages after retry exhaustion move to the error queue with fault metadata. ```csharp // Configure retry before dead-lettering x.AddConsumer(cfg => { cfg.UseMessageRetry(r => r.Intervals( TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(15))); }); ```text ### DLQ Monitoring Always monitor DLQ depth with alerts. Unmonitored DLQs accumulate silently until data is lost or stale. --- ## Saga / Process Manager Sagas coordinate multi-step business processes across services. Each step publishes events that trigger the next step, with compensation logic for failures. ### Choreography vs Orchestration | Style | How it works | Use when | | ----------------- | -------------------------------------------------------------- | ------------------------------------------------------- | | **Choreography** | Services react to events independently; no central coordinator | Simple flows, few steps, loosely coupled | | **Orchestration** | A saga/process manager directs each step | Complex flows, compensation needed, visibility required | ### MassTransit State Machine Saga ```csharp // Saga state public class OrderState : SagaStateMachineInstance { public Guid CorrelationId { get; set; } public string CurrentState { get; set; } = default!; public Guid OrderId { get; set; } public decimal Total { get; set; } public DateTime? PaymentReceivedAt { get; set; } } // State machine definition public sealed class OrderStateMachine : MassTransitStateMachine { public State Submitted { get; private set; } = default!; public State PaymentPending { get; private set; } = default!; public State Completed { get; private set; } = default!; public State Faulted { get; private set; } = default!; public Event OrderSubmitted { get; private set; } = default!; public Event PaymentReceived { get; private set; } = default!; public Event PaymentFailed { get; private set; } = default!; public OrderStateMachine() { InstanceState(x => x.CurrentState); Event(() => OrderSubmitted, x => x.CorrelateById(ctx => ctx.Message.OrderId)); Event(() => PaymentReceived, x => x.CorrelateById(ctx => ctx.Message.OrderId)); Event(() => PaymentFailed, x => x.CorrelateById(ctx => ctx.Message.OrderId)); Initially( When(OrderSubmitted) .Then(ctx => { ctx.Saga.OrderId = ctx.Message.OrderId; ctx.Saga.Total = ctx.Message.Total; }) .Publish(ctx => new RequestPayment( ctx.Saga.OrderId, ctx.Saga.Total)) .TransitionTo(PaymentPending)); During(PaymentPending, When(PaymentReceived) .Then(ctx => ctx.Saga.PaymentReceivedAt = DateTime.UtcNow) .Publish(ctx => new FulfillOrder(ctx.Saga.OrderId)) .TransitionTo(Completed), When(PaymentFailed) .Publish(ctx => new CancelOrder(ctx.Saga.OrderId)) .TransitionTo(Faulted)); } } // Registration -- requires MassTransit.EntityFrameworkCore package for EF persistence // NuGet: MassTransit.EntityFrameworkCore Version="8.*" builder.Services.AddMassTransit(x => { x.AddSagaStateMachine() .EntityFrameworkRepository(r => { r.ExistingDbContext(); r.UsePostgres(); }); x.UsingRabbitMq((context, cfg) => { cfg.ConfigureEndpoints(context); }); }); ```text ### Saga Persistence | Store | Package | Use when | | --------------------- | --------------------------------- | ---------------------------------------- | | Entity Framework Core | `MassTransit.EntityFrameworkCore` | Already using EF Core; need transactions | | MongoDB | `MassTransit.MongoDb` | Document-oriented state; high throughput | | Redis | `MassTransit.Redis` | Ephemeral sagas; low latency | | In-Memory | Built-in | Testing only -- state lost on restart | ### Compensation Pattern When a saga step fails, publish compensating commands to undo prior steps: ```bash OrderSubmitted -> RequestPayment -> PaymentReceived -> ReserveInventory | InventoryFailed | RefundPayment (compensation) | CancelOrder (compensation) ```text --- ## Idempotent Consumers At-least-once delivery means consumers may receive the same message multiple times. Idempotent consumers ensure repeated processing produces the same result. ### Database-Based Deduplication ```csharp public sealed class IdempotentOrderConsumer( AppDbContext db, ILogger logger) : IConsumer { public async Task Consume(ConsumeContext context) { var messageId = context.MessageId ?? throw new InvalidOperationException("Missing MessageId"); // Check if already processed var exists = await db.ProcessedMessages .AnyAsync(m => m.MessageId == messageId); if (exists) { logger.LogInformation( "Duplicate message {MessageId}, skipping", messageId); return; } // Process the message await ProcessOrderAsync(context.Message); // Record as processed db.ProcessedMessages.Add(new ProcessedMessage { MessageId = messageId, ProcessedAt = DateTime.UtcNow, ConsumerType = nameof(IdempotentOrderConsumer) }); await db.SaveChangesAsync(); } } ```text ### Natural Idempotency Prefer operations that are naturally idempotent: - **Upserts** (`INSERT ... ON CONFLICT UPDATE`) instead of blind inserts - **Conditional updates** (`UPDATE ... WHERE Status = 'Pending'`) instead of unconditional - **Deterministic IDs** derived from message content instead of auto-generated --- ## Message Envelope Pattern Wrap message payloads in a standard envelope with metadata for tracing, versioning, and routing. ```csharp public sealed record MessageEnvelope( string MessageId, string MessageType, DateTimeOffset Timestamp, string CorrelationId, string Source, int Version, // Schema version for backward-compatible deserialization T Payload); ```text MassTransit provides this automatically via `ConsumeContext` (MessageId, CorrelationId, Headers). When using raw broker clients, implement envelopes explicitly. --- ## Agent Gotchas 1. **Do not use auto-complete with Azure Service Bus** -- set `AutoCompleteMessages = false` and call `CompleteMessageAsync` after successful processing. Auto-complete acknowledges before processing finishes, risking data loss on failure. 2. **Do not forget to handle poison messages** -- always configure max delivery count and DLQ monitoring. Without these, a single bad message blocks the entire queue indefinitely. 3. **Do not use in-memory saga persistence in production** -- saga state is lost on restart, leaving business processes in unknown states. Use Entity Framework, MongoDB, or Redis persistence. 4. **Do not assume message ordering across partitions** -- competing consumers and topic subscriptions deliver messages out of order by default. Use sessions or partitioning when order matters. 5. **Do not skip idempotency for at-least-once consumers** -- brokers may redeliver on timeout, network glitch, or consumer restart. Every consumer must handle duplicate messages safely. 6. **Do not hardcode connection strings** -- use environment variables or Azure Key Vault references. For local development, use user secrets or `.env` files excluded from source control. --- ## References - [Azure Service Bus documentation](https://learn.microsoft.com/en-us/azure/service-bus-messaging/) - [Azure Service Bus client library for .NET](https://learn.microsoft.com/en-us/dotnet/api/overview/azure/messaging.servicebus-readme) - [RabbitMQ .NET client documentation](https://www.rabbitmq.com/client-libraries/dotnet-api-guide) - [MassTransit documentation](https://masstransit.io/documentation/concepts) - [MassTransit sagas](https://masstransit.io/documentation/patterns/saga) - [Enterprise Integration Patterns](https://www.enterpriseintegrationpatterns.com/) ```` ## Code Navigation (Serena MCP) **Primary approach:** Use Serena symbol operations for efficient code navigation: 1. **Find definitions**: `serena_find_symbol` instead of text search 2. **Understand structure**: `serena_get_symbols_overview` for file organization 3. **Track references**: `serena_find_referencing_symbols` for impact analysis 4. **Precise edits**: `serena_replace_symbol_body` for clean modifications **When to use Serena vs traditional tools:** - **Use Serena**: Navigation, refactoring, dependency analysis, precise edits - **Use Read/Grep**: Reading full files, pattern matching, simple text operations - **Fallback**: If Serena unavailable, traditional tools work fine **Example workflow:** ```text # Instead of: Read: src/Services/OrderService.cs Grep: "public void ProcessOrder" # Use: serena_find_symbol: "OrderService/ProcessOrder" serena_get_symbols_overview: "src/Services/OrderService.cs" ```