Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 79 additions & 3 deletions docs/provider_rabbitmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ Please read the [Introduction](intro.md) before reading this provider documentat
- [Configuration](#configuration)
- [Producers](#producers)
- [Consumers](#consumers)
- [Routing Keys and Wildcard Support](#routing-keys-and-wildcard-support)
- [Basic Routing Keys](#basic-routing-keys)
- [Wildcard Routing Keys](#wildcard-routing-keys)
- [Acknowledgment Mode](#acknowledgment-mode)
- [Consumer Error Handling](#consumer-error-handling)
- [Dead Letter Exchange](#dead-letter-exchange)
Expand Down Expand Up @@ -148,6 +151,79 @@ We can specify defaults for all consumers on the bus level:
});
```

#### Routing Keys and Wildcard Support

RabbitMQ routing keys are used by exchanges to determine which queues should receive a message. SlimMessageBus fully supports RabbitMQ's routing key semantics, including **wildcard routing keys** for topic exchanges.

##### Basic Routing Keys

For direct and topic exchanges, you can specify exact routing keys when binding consumers:

```cs
mbb.Consume<OrderEvent>(x => x
.Queue("orders-queue")
.ExchangeBinding("orders-exchange", routingKey: "orders.created")
.WithConsumer<OrderCreatedConsumer>());
```

##### Wildcard Routing Keys

For topic exchanges, SlimMessageBus supports RabbitMQ's wildcard routing key patterns:

- **`*` (asterisk)** - matches exactly one segment
- **`#` (hash)** - matches zero or more segments
- Segments are separated by `.` (dot)

**Examples:**

```cs
services.AddSlimMessageBus(mbb =>
{
// Producer sends messages with specific routing keys
mbb.Produce<RegionEvent>(x => x
.Exchange("regions", exchangeType: ExchangeType.Topic)
.RoutingKeyProvider((m, p) => $"regions.{m.Country}.cities.{m.City}"));

// Consumer 1: Match all cities in North America
mbb.Consume<RegionEvent>(x => x
.Queue("na-cities-queue")
.ExchangeBinding("regions", routingKey: "regions.na.cities.*") // * matches exactly one city
.WithConsumer<NorthAmericaCitiesConsumer>());

// Consumer 2: Match all events in the regions exchange
mbb.Consume<RegionEvent>(x => x
.Queue("all-regions-queue")
.ExchangeBinding("regions", routingKey: "regions.#") // # matches zero or more segments
.WithConsumer<AllRegionsConsumer>());

// Consumer 3: Match all audit events with any number of segments
mbb.Consume<AuditEvent>(x => x
.Queue("audit-queue")
.ExchangeBinding("audit", routingKey: "audit.events.#")
.WithConsumer<AuditConsumer>());

// Consumer 4: Complex pattern - match region events ending with specific pattern
mbb.Consume<RegionEvent>(x => x
.Queue("region-reports-queue")
.ExchangeBinding("regions", routingKey: "regions.*.reports.*") // matches regions.{country}.reports.{type}
.WithConsumer<RegionReportsConsumer>());
});
```

**Routing Key Pattern Examples:**

| Pattern | Matches | Doesn't Match |
|---------|---------|---------------|
| `regions.na.cities.*` | `regions.na.cities.toronto`<br/>`regions.na.cities.newyork` | `regions.na.cities` (missing segment)<br/>`regions.na.cities.toronto.downtown` (extra segment) |
| `audit.events.#` | `audit.events.users.signup`<br/>`audit.events.orders.placed`<br/>`audit.events` | `audit.users` (wrong prefix) |
| `orders.#.region.*` | `orders.processed.region.na`<br/>`orders.created.cancelled.region.eu`<br/>`orders.region.na` | `orders.processed.state.california` (wrong pattern)<br/>`orders.processed.region` (missing final segment) |
| `#` | Any routing key | None (matches everything) |

**Performance Note:** SlimMessageBus optimizes routing key matching by:
- Using exact matches first for better performance
- Only applying wildcard pattern matching when no exact match is found
- Caching routing key patterns for efficient lookup

#### Acknowledgment Mode

When a consumer processes a message from a RabbitMQ queue, it needs to acknowledge that the message was processed. RabbitMQ supports three types of acknowledgments out which two are available in SMB:
Expand Down Expand Up @@ -370,7 +446,7 @@ In RabbitMQ, the default exchange (sometimes referred to as the default direct e

- Its name is an empty string (`""`).
- It is of type direct.
- Every queue that you declare is automatically bound to this default exchange with a routing key equal to the queues name.
- Every queue that you declare is automatically bound to this default exchange with a routing key equal to the queue's name.

This means:

Expand All @@ -380,14 +456,14 @@ This will deliver the message straight to the `my_queue` queue.

### Why it exists

The default exchange makes it easy to send messages directly to a queue without having to explicitly set up an exchange and binding. Its often used for simple "Hello World" style examples and direct queue messaging.
The default exchange makes it easy to send messages directly to a queue without having to explicitly set up an exchange and binding. It's often used for simple "Hello World" style examples and direct queue messaging.

✅ **Key points to remember**

- The default exchange has no name (`""`).
- Type: direct.
- Auto-binds every queue by its own name.
- Messages published to it must use the queues name as the routing key.
- Messages published to it must use the queue's name as the routing key.

## Connection Resiliency

Expand Down
2 changes: 1 addition & 1 deletion src/Host.Plugin.Properties.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<Import Project="Common.NuGet.Properties.xml" />

<PropertyGroup>
<Version>3.3.2</Version>
<Version>3.3.3-rc100</Version>
</PropertyGroup>

</Project>
33 changes: 25 additions & 8 deletions src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class RabbitMqConsumer : AbstractRabbitMqConsumer, IRabbitMqConsumer

private readonly RabbitMqMessageAcknowledgementMode _acknowledgementMode;
private readonly IMessageProcessor<BasicDeliverEventArgs> _messageProcessor;
private readonly IDictionary<string, IMessageProcessor<BasicDeliverEventArgs>> _messageProcessorByRoutingKey;
private readonly RoutingKeyMatcherService<IMessageProcessor<BasicDeliverEventArgs>> _routingKeyMatcher;

protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => _acknowledgementMode;

Expand Down Expand Up @@ -57,25 +57,42 @@ IMessageProcessor<BasicDeliverEventArgs> CreateMessageProcessor(IEnumerable<Cons
return messageProcessor;
}

_messageProcessorByRoutingKey = consumers
var routingKeyProcessors = consumers
.GroupBy(x => x.GetBindingRoutingKey() ?? string.Empty)
.ToDictionary(x => x.Key, CreateMessageProcessor);

_messageProcessor = _messageProcessorByRoutingKey.Count == 1 && _messageProcessorByRoutingKey.TryGetValue(string.Empty, out var value)
// Initialize routing key matcher service
_routingKeyMatcher = new RoutingKeyMatcherService<IMessageProcessor<BasicDeliverEventArgs>>(routingKeyProcessors);

// Set single processor optimization if only one exact match exists and no wildcards
_messageProcessor = routingKeyProcessors.Count == 1 && routingKeyProcessors.TryGetValue(string.Empty, out var value)
? value : null;
}

private IMessageProcessor<BasicDeliverEventArgs> FindMessageProcessor(string messageRoutingKey)
{
// If only single processor for all messages, return it directly for better performance
if (_messageProcessor != null)
{
return _messageProcessor;
}

// Use routing key matcher service to find the appropriate processor
return _routingKeyMatcher.FindMatch(messageRoutingKey);
}

protected override async Task OnStop()
{
try
{
// Wait max 5 seconds for all background processing tasks to complete
using var taskCancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var backgrounProcessingTasks = _messageProcessorByRoutingKey.Values
var backgroundProcessingTasks = _routingKeyMatcher.AllItems
.Distinct()
.OfType<ConcurrentMessageProcessorDecorator<BasicDeliverEventArgs>>()
.Select(x => x.WaitAll(taskCancellationSource.Token));

await Task.WhenAll(backgrounProcessingTasks);
await Task.WhenAll(backgroundProcessingTasks);
}
catch (Exception e)
{
Expand Down Expand Up @@ -139,8 +156,8 @@ protected override async Task<Exception> OnMessageReceived(Dictionary<string, ob
ConfirmMessage(transportMessage, RabbitMqMessageConfirmOptions.Ack, consumerContextProperties);
}

var messageProcessor = _messageProcessor;
if (messageProcessor != null || _messageProcessorByRoutingKey.TryGetValue(transportMessage.RoutingKey, out messageProcessor))
var messageProcessor = FindMessageProcessor(transportMessage.RoutingKey);
if (messageProcessor != null)
{
await messageProcessor.ProcessMessage(transportMessage, messageHeaders: messageHeaders, consumerContextProperties: consumerContextProperties, cancellationToken: CancellationToken);
}
Expand All @@ -157,7 +174,7 @@ protected override async ValueTask DisposeAsyncCore()
{
await base.DisposeAsyncCore();

foreach (var messageProcessor in _messageProcessorByRoutingKey.Values)
foreach (var messageProcessor in _routingKeyMatcher.AllItems.Distinct())
{
if (messageProcessor is IDisposable disposable)
{
Expand Down
Loading
Loading