# RabbitMQ transport provider for SlimMessageBus Please read the [Introduction](intro.md) before reading this provider documentation. - [Underlying client](#underlying-client) - [Concepts](#concepts) - [Configuration](#configuration) - [Producers](#producers) - [Consumers](#consumers) - [Routing Keys and Wildcard Support](#routing-keys-and-wildcard-support) - [Basic Routing Keys](#basic-routing-keys) - [Wildcard Routing Keys](#wildcard-routing-keys) - [Acknowledgment Mode](#acknowledgment-mode) - [Consumer Error Handling](#consumer-error-handling) - [Dead Letter Exchange](#dead-letter-exchange) - [Custom Consumer Error Handler](#custom-consumer-error-handler) - [Consumer Concurrency Level](#consumer-concurrency-level) - [Request-Response](#request-response) - [Topology Provisioning](#topology-provisioning) - [Default Exchange](#default-exchange) - [Why it exists](#why-it-exists) - [Connection Resiliency](#connection-resiliency) - [Recipes](#recipes) - [01 Multiple consumers on the same queue with different concurrency](#01-multiple-consumers-on-the-same-queue-with-different-concurrency) - [Feedback](#feedback) ## Underlying client The [`RabbitMQ`](https://www.nuget.org/packages/SlimMessageBus.Host.RabbitMQ) transport provider uses [RabbitMQ.Client](https://www.nuget.org/packages/RabbitMQ.Client/) client to connect to the RabbitMQ cluster via the AMQP protocol. ## Concepts The RabbitMQ and AMQP protocol introduce couple of concepts: - Exchange - entities to which producers send messages, - Queue - mailboxes which consumers read messages from, - Binding - are rules that exchanges use to route messages to queues. [AMQP Concepts](https://www.rabbitmq.com/tutorials/amqp-concepts.html) provides a brilliant overview. ## Configuration The RabbitMQ transport configuration is arranged via the `.WithProviderRabbitMQ(cfg => {})` method on the message bus builder. ```cs using SlimMessageBus.Host.RabbitMQ; // Register SlimMessageBus in MSDI services.AddSlimMessageBus((mbb) => { // Use RabbitMQ transport provider mbb.WithProviderRabbitMQ(cfg => { // Connect using AMQP URI cfg.ConnectionString = configuration["RabbitMQ:ConnectionString"]; // Alternatively, when not using AMQP URI: // cfg.ConnectionFactory.HostName = "..." // cfg.ConnectionFactory.VirtualHost = "..." // cfg.ConnectionFactory.UserName = "..." // cfg.ConnectionFactory.Password = "..." // cfg.ConnectionFactory.Ssl.Enabled = true // Fine tune the underlying RabbitMQ.Client: // cfg.ConnectionFactory.ClientProvidedName = $"MyService_{Environment.MachineName}"; }); mbb.AddServicesFromAssemblyContaining(); mbb.AddJsonSerializer(); }); ``` The relevant elements of the `cfg`: - The `ConnectionString` allows to set the AMQP URI. This property is a convenience wrapper on top of `ConnectionFactory.Uri` from the underlying client library. The URI has the following form: `amqps://:@/`. - The `ConnectionFactory` allows to access other client settings. It can be used to setup other connection details in case the AMQP URI cannot be used or there is a need to fine tune the client. For more options see the underlying [RabbitMQ driver docs](https://www.rabbitmq.com/dotnet-api-guide.html#connecting). ### Producers Producers need to declare the exchange name and type the message should be delivered to. SMB will provision the specified exchange. Additionally, we can specify: - the modifier that allows to assign message properties (`MessageId`, `ContentType`, and headers), - the message key provider that is used in routing for relevant exchange types. ```cs mbb.Produce(x => x // Will declare an orders exchange of type Fanout .Exchange("orders", exchangeType: ExchangeType.Fanout) // Will use a routing key provider that for a given message will take it's Id field .RoutingKeyProvider((m, p) => m.Id.ToString()) // Will use .MessagePropertiesModifier((m, p) => { p.MessageId = GetMessageId(m); })); ``` We can also set defaults for all producers on the bus level: ```cs services.AddSlimMessageBus((mbb) => { mbb.WithProviderRabbitMQ(cfg => { // All exchanges declared on producers will be durable by default cfg.UseExchangeDefaults(durable: true); // All messages will get the ContentType message property assigned cfg.UseMessagePropertiesModifier((m, p) => { p.ContentType = MediaTypeNames.Application.Json; }); }); mbb.AddJsonSerializer(); }); ``` ### Consumers Consumers need to specify the queue name from which the consumer should be reading from. SMB will provision the specified queue. Additionally, - when the exchange name binding is specified then SMB will provision that binding with the broker, - when [dead letter exchange](#dead-letter-exchange) is specified then the queue will provisioned with the broker, and if the exchange type is specified it will also be provisioned. ```cs mbb.Consume(x => x // Use the subscriber queue, do not auto delete .Queue("subscriber", autoDelete: false) // .ExchangeBinding("ping") // The queue declaration in RabbitMQ will have a reference to the dead letter exchange and the DL exchange will be created .DeadLetterExchange("subscriber-dlq", exchangeType: ExchangeType: Direct) .WithConsumer()); ``` We can specify defaults for all consumers on the bus level: ```cs services.AddSlimMessageBus((mbb) => { mbb.WithProviderRabbitMQ(cfg => { cfg.UseDeadLetterExchangeDefaults(durable: false, autoDelete: false, exchangeType: ExchangeType.Direct, routingKey: string.Empty); cfg.UseQueueDefaults(durable: false); }); }); ``` #### Routing Keys and Wildcard Support RabbitMQ routing keys are used by exchanges to determine which queues should receive a message. SlimMessageBus fully supports RabbitMQ's routing key semantics, including **wildcard routing keys** for topic exchanges. ##### Basic Routing Keys For direct and topic exchanges, you can specify exact routing keys when binding consumers: ```cs mbb.Consume(x => x .Queue("orders-queue") .ExchangeBinding("orders-exchange", routingKey: "orders.created") .WithConsumer()); ``` ##### Wildcard Routing Keys For topic exchanges, SlimMessageBus supports RabbitMQ's wildcard routing key patterns: - **`*` (asterisk)** - matches exactly one segment - **`#` (hash)** - matches zero or more segments - Segments are separated by `.` (dot) **Examples:** ```cs services.AddSlimMessageBus(mbb => { // Producer sends messages with specific routing keys mbb.Produce(x => x .Exchange("regions", exchangeType: ExchangeType.Topic) .RoutingKeyProvider((m, p) => $"regions.{m.Country}.cities.{m.City}")); // Consumer 1: Match all cities in North America mbb.Consume(x => x .Queue("na-cities-queue") .ExchangeBinding("regions", routingKey: "regions.na.cities.*") // * matches exactly one city .WithConsumer()); // Consumer 2: Match all events in the regions exchange mbb.Consume(x => x .Queue("all-regions-queue") .ExchangeBinding("regions", routingKey: "regions.#") // # matches zero or more segments .WithConsumer()); // Consumer 3: Match all audit events with any number of segments mbb.Consume(x => x .Queue("audit-queue") .ExchangeBinding("audit", routingKey: "audit.events.#") .WithConsumer()); // Consumer 4: Complex pattern - match region events ending with specific pattern mbb.Consume(x => x .Queue("region-reports-queue") .ExchangeBinding("regions", routingKey: "regions.*.reports.*") // matches regions.{country}.reports.{type} .WithConsumer()); }); ``` **Routing Key Pattern Examples:** | Pattern | Matches | Doesn't Match | |---------|---------|---------------| | `regions.na.cities.*` | `regions.na.cities.toronto`
`regions.na.cities.newyork` | `regions.na.cities` (missing segment)
`regions.na.cities.toronto.downtown` (extra segment) | | `audit.events.#` | `audit.events.users.signup`
`audit.events.orders.placed`
`audit.events` | `audit.users` (wrong prefix) | | `orders.#.region.*` | `orders.processed.region.na`
`orders.created.cancelled.region.eu`
`orders.region.na` | `orders.processed.state.california` (wrong pattern)
`orders.processed.region` (missing final segment) | | `#` | Any routing key | None (matches everything) | **Performance Note:** SlimMessageBus optimizes routing key matching by: - Using exact matches first for better performance - Only applying wildcard pattern matching when no exact match is found - Caching routing key patterns for efficient lookup #### Acknowledgment Mode When a consumer processes a message from a RabbitMQ queue, it needs to acknowledge that the message was processed. RabbitMQ supports three types of acknowledgments out which two are available in SMB: - Ack - to indicate the message was successfully processed and it should be removed from the queue. - Nack (negative ack) - when the message was processed but resulted in an error, while still it needs to be removed from the queue or retried (depending what the user chooses in the given use case). In SMB we can set the acknowledgment mode for each consumer: ```cs builder.Consume(x => x .Queue("subscriber", autoDelete: false) .ExchangeBinding(topic) // Set the acknowledgement mode, the ConfirmAfterMessageProcessingWhenNoManualConfirmMade is the default .AcknowledgementMode(RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade) .WithConsumer()); ``` Alternatively, a bus wide default can be specified for all consumers: ```cs services.AddSlimMessageBus((mbb) => { mbb.WithProviderRabbitMQ(cfg => { // Set the acknowledgement mode, the ConfirmAfterMessageProcessingWhenNoManualConfirmMade is the default cfg.AcknowledgementMode(RabbitMqMessageAcknowledgementMode.ConfirmAfterMessageProcessingWhenNoManualConfirmMade); }); }); ``` See the [RabbitMqMessageAcknowledgementMode](../src/SlimMessageBus.Host.RabbitMQ/Config/RabbitMqMessageAcknowledgementMode.cs) has the available options. By default (`ConfirmAfterMessageProcessingWhenNoManualConfirmMade`), messages are acknowledge (Ack) after the message processing finish with success. If an exception where to happen the message is rejected (Nack) (or else whatever the [custom error handler](#consumer-error-handling) logic does). In that default mode, the user can still Ack or Nack the message depending on the need inside of the consumer or interceptor using the Ack() / Nack() methods exposed on the [ConsumerContext](intro.md#consumer-context-additional-message-information) - this allows for manual acknowledgements. The default setting is optimal and safe. However, message retries could happen (at-least-once delivery). The other acknowledgement modes will ack the message before processing, but are less safe as it will lead to at-most-once delivery. #### Consumer Error Handling By default the the transport implementation performs a negative ack (nack) in the AMQP protocol for any message that failed in the consumer. As a result the message will be marked as failed and routed to an dead letter exchange or discarded by the RabbitMQ broker. The recommendation here is to either: - configure a [dead letter exchange](#dead-letter-exchange) on the consumer queue, - or provide a [custom error handler](#custom-consumer-error-handler) (retry the message couple of times, if failed send to a dead letter exchange). ##### Dead Letter Exchange The [Dead Letter Exchanges](https://www.rabbitmq.com/dlx.html) is a feature of RabbitMQ that will forward failed messages from a particular queue to a special exchange. In SMB on the consumer declaration we can specify which dead letter exchange should be used: ```cs mbb.Consume(x => x .Queue("subscriber", autoDelete: false) .ExchangeBinding(topic) // The queue provisioned in RabbitMQ will have a reference to the dead letter exchange .DeadLetterExchange("subscriber-dlq") .WithConsumer()); ``` However, the `subscriber-dlq` will not be created by SMB in the sample. For it to be created the `ExchangeType` has to be specified, so that SMB knows what exchange type should it apply. It can be specified on the consumer: ```cs mbb.Consume(x => x .Queue("subscriber", autoDelete: false) .ExchangeBinding(topic) // The queue provisioned in RabbitMQ will have a reference to the dead letter exchange and the DL exchange will be provisioned .DeadLetterExchange("subscriber-dlq", exchangeType: ExchangeType: Direct) .WithConsumer()); ``` Alternatively, a bus wide default can be specified for all dead letter exchanges: ```cs services.AddSlimMessageBus((mbb) => { mbb.WithProviderRabbitMQ(cfg => { // All the declared dead letter exchanges on the consumers will be of Direct type cfg.UseDeadLetterExchangeDefaults(durable: false, autoDelete: false, exchangeType: ExchangeType.Direct, routingKey: string.Empty); }); }); ``` ##### Custom Consumer Error Handler Define a custom class implementation of `IRabbitMqConsumerErrorHandler<>`: ```cs public class CustomRabbitMqConsumerErrorHandler : IRabbitMqConsumerErrorHandler { // Inject needed dependencies via construction public async Task OnHandleError(T message, Func retry, IConsumerContext consumerContext, Exception exception) { // Check if this is consumer context for RabbitMQ var isRabbitMqContext = consumerContext.GetTransportMessage() != null; if (isRabbitMqContext) { if (exception is TransientException) { // Send negative acknowledge but ask the broker to retry consumerContext.NackWithRequeue(); } else { // Send negative acknowledge (if dead letter setup it will be routed to it) consumerContext.Nack(); } // Mark that the errored message was handled return true; } return false; } } ``` Then register the implementation in MSDI for all (or specified) message types. ```cs // Register error handler in MSDI for any message type services.AddTransient(typeof(IRabbitMqConsumerErrorHandler<>), typeof(CustomRabbitMqConsumerErrorHandler<>)); ``` > When error handler is not found in the DI for the given message type, or it returns `false`, then default error handling will be applied. See also the common [error handling](intro.md#error-handling). #### Consumer Concurrency Level By default each consumer in the service process will handle one message at the same time. In order to increase the desired concurrency, set the [`ConsumerDispatchConcurrency`](https://www.rabbitmq.com/dotnet-api-guide.html#consumer-callbacks-and-ordering) to a value greater than 1. This is a setting from the underlying RabbitMQ driver that SMB uses. ```cs services.AddSlimMessageBus((mbb) => { mbb.WithProviderRabbitMQ(cfg => { cfg.ConnectionFactory.ConsumerDispatchConcurrency = 2; // default is 1 // ... } } ``` > Notice that increasing concurrency will cause more messages to be processed at the same time within one service instance, hence affecting order of consumption. > In scenarios where order of consumption is important, you may want to keep concurrency levels set to 1. ### Request-Response Here is an example how to set-up request-response flow over RabbitMQ. The fanout exchange types was used, but other type could be used as well (altough we might have to provide the [routing key provider](#producers) on the producer side.) ```cs services.AddSlimMessageBus((mbb) => { // ... mbb.Produce(x => { // The requests should be send to "test-echo" exchange x.Exchange("test-echo", exchangeType: ExchangeType.Fanout); }) .Handle(x => x // Declare the queue for the handler .Queue("echo-request-handler") // Bind the queue to the "test-echo" exchange .ExchangeBinding("test-echo") // If the request handling fails, the failed messages will be routed to the DLQ exchange .DeadLetterExchange("echo-request-handler-dlq") .WithHandler()) .ExpectRequestResponses(x => { // Tell the handler to which exchange send the responses to x.ReplyToExchange("test-echo-resp", ExchangeType.Fanout); // Which queue to use to read responses from x.Queue("test-echo-resp-queue"); // Bind to the reply to exchange x.ExchangeBinding(); // Timeout if the response doesn't arrive within 60 seconds x.DefaultTimeout(TimeSpan.FromSeconds(60)); }); }); ``` ## Topology Provisioning SMB automatically creates exchanges from producers, queues, dead letter exchanges and bindings from consumers. However, if you need to layer on other topology elements (or peform cleanup) this could be achieved with `UseTopologyInitializer()`: ```cs services.AddSlimMessageBus((mbb) => { mbb.WithProviderRabbitMQ(cfg => { cfg.UseTopologyInitializer((channel, applyDefaultTopology) => { // perform some cleanup if needed channel.QueueDelete("subscriber-0", ifUnused: true, ifEmpty: false); channel.QueueDelete("subscriber-1", ifUnused: true, ifEmpty: false); channel.ExchangeDelete("test-ping", ifUnused: true); channel.ExchangeDelete("subscriber-dlq", ifUnused: true); // apply default SMB inferred topology applyDefaultTopology(); }); }); }); ``` Avoiding the call `applyDefaultTopology()` will suppress the SMB inferred topology creation. This might be useful in case the SMB inferred topology is not desired or there are other custom needs. ## Default Exchange In RabbitMQ, the default exchange (sometimes referred to as the default direct exchange) is a pre-declared, nameless direct exchange with a special behavior: - Its name is an empty string (`""`). - It is of type direct. - Every queue that you declare is automatically bound to this default exchange with a routing key equal to the queue's name. This means: - When you publish a message to the default exchange (exchange name = `""`) with a routing key set to the queue name, the message is delivered directly to that queue — no explicit binding is needed. This will deliver the message straight to the `my_queue` queue. ### Why it exists The default exchange makes it easy to send messages directly to a queue without having to explicitly set up an exchange and binding. It's often used for simple "Hello World" style examples and direct queue messaging. ✅ **Key points to remember** - The default exchange has no name (`""`). - Type: direct. - Auto-binds every queue by its own name. - Messages published to it must use the queue's name as the routing key. ## Connection Resiliency The `RabbitMqChannelManager` provides enhanced connection resilience with automatic consumer recovery: - **Continuous Retry**: Unlike the previous 3-attempt limit, the connection now retries indefinitely in the background - **Automatic Recovery**: Integrates with RabbitMQ's built-in automatic recovery features - **Connection Monitoring**: Monitors connection shutdown events and triggers reconnection - **Consumer Re-registration**: When the connection is recovered, all consumers automatically re-register themselves with the new channel - **Health Integration**: The health check provides visibility into connection retry status - **Thread-Safe Operations**: All channel operations are properly synchronized ### Consumer Recovery When a RabbitMQ server restarts or the connection is lost, SlimMessageBus automatically: 1. Detects the connection loss through shutdown events 2. Attempts to reconnect at regular intervals (default: every 5 seconds, configurable via `ConnectionFactory.NetworkRecoveryInterval`) 3. Recreates the channel and provisions the topology (exchanges, queues, bindings) 4. Notifies all consumers to re-register themselves with the new channel 5. Resumes message processing automatically This ensures that: - Messages don't pile up in queues during temporary outages - Consumers are visible in the RabbitMQ management UI after recovery - No manual intervention is required to restore message processing **Example scenario**: If you restart your RabbitMQ Docker container, SlimMessageBus will detect the disconnection, wait for the server to come back online, and automatically restore all consumers without any code changes or manual restarts. ## Recipes ### 01 Multiple consumers on the same queue with different concurrency The same `queue` will be used to consume messages, however there will be internal message processors running with different concurrency (10 and 1) depending on the message routing key. ```csharp services.AddSlimMessageBus(mbb => { mbb.Produce(x => x .Exchange("exchange", exchangeType: ExchangeType.Direct) .RoutingKeyProvider((m, p) => m.Label)); // messages with blue routing key will get 1 concurrency mbb.Consume(x => x .Queue("queue", autoDelete: false) .ExchangeBinding("orders", routingKey: "blue") .Instances(1)); // messages with red routing key will get 10 concurrency mbb.Consume(x => x .Queue("queue", autoDelete: false) .ExchangeBinding("orders", routingKey: "red") .Instances(10)); }); ``` ## Feedback Open a github issue if you need a feature, have a suggestion for improvement, or want to contribute an enhancement.