Skip to content

Commit 94858b5

Browse files
committed
[Host.RabbitMq] Wildcard support in routing key #431
Signed-off-by: Tomasz Maruszak <[email protected]>
1 parent dc0df83 commit 94858b5

File tree

7 files changed

+526
-12
lines changed

7 files changed

+526
-12
lines changed

docs/provider_rabbitmq.md

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ Please read the [Introduction](intro.md) before reading this provider documentat
77
- [Configuration](#configuration)
88
- [Producers](#producers)
99
- [Consumers](#consumers)
10+
- [Routing Keys and Wildcard Support](#routing-keys-and-wildcard-support)
11+
- [Basic Routing Keys](#basic-routing-keys)
12+
- [Wildcard Routing Keys](#wildcard-routing-keys)
1013
- [Acknowledgment Mode](#acknowledgment-mode)
1114
- [Consumer Error Handling](#consumer-error-handling)
1215
- [Dead Letter Exchange](#dead-letter-exchange)
@@ -148,6 +151,79 @@ We can specify defaults for all consumers on the bus level:
148151
});
149152
```
150153

154+
#### Routing Keys and Wildcard Support
155+
156+
RabbitMQ routing keys are used by exchanges to determine which queues should receive a message. SlimMessageBus fully supports RabbitMQ's routing key semantics, including **wildcard routing keys** for topic exchanges.
157+
158+
##### Basic Routing Keys
159+
160+
For direct and topic exchanges, you can specify exact routing keys when binding consumers:
161+
162+
```cs
163+
mbb.Consume<OrderEvent>(x => x
164+
.Queue("orders-queue")
165+
.ExchangeBinding("orders-exchange", routingKey: "orders.created")
166+
.WithConsumer<OrderCreatedConsumer>());
167+
```
168+
169+
##### Wildcard Routing Keys
170+
171+
For topic exchanges, SlimMessageBus supports RabbitMQ's wildcard routing key patterns:
172+
173+
- **`*` (asterisk)** - matches exactly one segment
174+
- **`#` (hash)** - matches zero or more segments
175+
- Segments are separated by `.` (dot)
176+
177+
**Examples:**
178+
179+
```cs
180+
services.AddSlimMessageBus(mbb =>
181+
{
182+
// Producer sends messages with specific routing keys
183+
mbb.Produce<RegionEvent>(x => x
184+
.Exchange("regions", exchangeType: ExchangeType.Topic)
185+
.RoutingKeyProvider((m, p) => $"regions.{m.Country}.cities.{m.City}"));
186+
187+
// Consumer 1: Match all cities in North America
188+
mbb.Consume<RegionEvent>(x => x
189+
.Queue("na-cities-queue")
190+
.ExchangeBinding("regions", routingKey: "regions.na.cities.*") // * matches exactly one city
191+
.WithConsumer<NorthAmericaCitiesConsumer>());
192+
193+
// Consumer 2: Match all events in the regions exchange
194+
mbb.Consume<RegionEvent>(x => x
195+
.Queue("all-regions-queue")
196+
.ExchangeBinding("regions", routingKey: "regions.#") // # matches zero or more segments
197+
.WithConsumer<AllRegionsConsumer>());
198+
199+
// Consumer 3: Match all audit events with any number of segments
200+
mbb.Consume<AuditEvent>(x => x
201+
.Queue("audit-queue")
202+
.ExchangeBinding("audit", routingKey: "audit.events.#")
203+
.WithConsumer<AuditConsumer>());
204+
205+
// Consumer 4: Complex pattern - match region events ending with specific pattern
206+
mbb.Consume<RegionEvent>(x => x
207+
.Queue("region-reports-queue")
208+
.ExchangeBinding("regions", routingKey: "regions.*.reports.*") // matches regions.{country}.reports.{type}
209+
.WithConsumer<RegionReportsConsumer>());
210+
});
211+
```
212+
213+
**Routing Key Pattern Examples:**
214+
215+
| Pattern | Matches | Doesn't Match |
216+
|---------|---------|---------------|
217+
| `regions.na.cities.*` | `regions.na.cities.toronto`<br/>`regions.na.cities.newyork` | `regions.na.cities` (missing segment)<br/>`regions.na.cities.toronto.downtown` (extra segment) |
218+
| `audit.events.#` | `audit.events.users.signup`<br/>`audit.events.orders.placed`<br/>`audit.events` | `audit.users` (wrong prefix) |
219+
| `orders.#.region.*` | `orders.processed.region.na`<br/>`orders.created.cancelled.region.eu`<br/>`orders.region.na` | `orders.processed.state.california` (wrong pattern)<br/>`orders.processed.region` (missing final segment) |
220+
| `#` | Any routing key | None (matches everything) |
221+
222+
**Performance Note:** SlimMessageBus optimizes routing key matching by:
223+
- Using exact matches first for better performance
224+
- Only applying wildcard pattern matching when no exact match is found
225+
- Caching routing key patterns for efficient lookup
226+
151227
#### Acknowledgment Mode
152228

153229
When a consumer processes a message from a RabbitMQ queue, it needs to acknowledge that the message was processed. RabbitMQ supports three types of acknowledgments out which two are available in SMB:
@@ -370,7 +446,7 @@ In RabbitMQ, the default exchange (sometimes referred to as the default direct e
370446

371447
- Its name is an empty string (`""`).
372448
- It is of type direct.
373-
- Every queue that you declare is automatically bound to this default exchange with a routing key equal to the queues name.
449+
- Every queue that you declare is automatically bound to this default exchange with a routing key equal to the queue's name.
374450

375451
This means:
376452

@@ -380,14 +456,14 @@ This will deliver the message straight to the `my_queue` queue.
380456

381457
### Why it exists
382458

383-
The default exchange makes it easy to send messages directly to a queue without having to explicitly set up an exchange and binding. Its often used for simple "Hello World" style examples and direct queue messaging.
459+
The default exchange makes it easy to send messages directly to a queue without having to explicitly set up an exchange and binding. It's often used for simple "Hello World" style examples and direct queue messaging.
384460

385461
✅ **Key points to remember**
386462

387463
- The default exchange has no name (`""`).
388464
- Type: direct.
389465
- Auto-binds every queue by its own name.
390-
- Messages published to it must use the queues name as the routing key.
466+
- Messages published to it must use the queue's name as the routing key.
391467

392468
## Connection Resiliency
393469

src/Host.Plugin.Properties.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<Import Project="Common.NuGet.Properties.xml" />
55

66
<PropertyGroup>
7-
<Version>3.3.2</Version>
7+
<Version>3.3.3-rc100</Version>
88
</PropertyGroup>
99

1010
</Project>

src/SlimMessageBus.Host.RabbitMQ/Consumers/RabbitMqConsumer.cs

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public class RabbitMqConsumer : AbstractRabbitMqConsumer, IRabbitMqConsumer
1414

1515
private readonly RabbitMqMessageAcknowledgementMode _acknowledgementMode;
1616
private readonly IMessageProcessor<BasicDeliverEventArgs> _messageProcessor;
17-
private readonly IDictionary<string, IMessageProcessor<BasicDeliverEventArgs>> _messageProcessorByRoutingKey;
17+
private readonly RoutingKeyMatcherService<IMessageProcessor<BasicDeliverEventArgs>> _routingKeyMatcher;
1818

1919
protected override RabbitMqMessageAcknowledgementMode AcknowledgementMode => _acknowledgementMode;
2020

@@ -57,25 +57,42 @@ IMessageProcessor<BasicDeliverEventArgs> CreateMessageProcessor(IEnumerable<Cons
5757
return messageProcessor;
5858
}
5959

60-
_messageProcessorByRoutingKey = consumers
60+
var routingKeyProcessors = consumers
6161
.GroupBy(x => x.GetBindingRoutingKey() ?? string.Empty)
6262
.ToDictionary(x => x.Key, CreateMessageProcessor);
6363

64-
_messageProcessor = _messageProcessorByRoutingKey.Count == 1 && _messageProcessorByRoutingKey.TryGetValue(string.Empty, out var value)
64+
// Initialize routing key matcher service
65+
_routingKeyMatcher = new RoutingKeyMatcherService<IMessageProcessor<BasicDeliverEventArgs>>(routingKeyProcessors);
66+
67+
// Set single processor optimization if only one exact match exists and no wildcards
68+
_messageProcessor = routingKeyProcessors.Count == 1 && routingKeyProcessors.TryGetValue(string.Empty, out var value)
6569
? value : null;
6670
}
6771

72+
private IMessageProcessor<BasicDeliverEventArgs> FindMessageProcessor(string messageRoutingKey)
73+
{
74+
// If only single processor for all messages, return it directly for better performance
75+
if (_messageProcessor != null)
76+
{
77+
return _messageProcessor;
78+
}
79+
80+
// Use routing key matcher service to find the appropriate processor
81+
return _routingKeyMatcher.FindMatch(messageRoutingKey);
82+
}
83+
6884
protected override async Task OnStop()
6985
{
7086
try
7187
{
7288
// Wait max 5 seconds for all background processing tasks to complete
7389
using var taskCancellationSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
74-
var backgrounProcessingTasks = _messageProcessorByRoutingKey.Values
90+
var backgroundProcessingTasks = _routingKeyMatcher.AllItems
91+
.Distinct()
7592
.OfType<ConcurrentMessageProcessorDecorator<BasicDeliverEventArgs>>()
7693
.Select(x => x.WaitAll(taskCancellationSource.Token));
7794

78-
await Task.WhenAll(backgrounProcessingTasks);
95+
await Task.WhenAll(backgroundProcessingTasks);
7996
}
8097
catch (Exception e)
8198
{
@@ -139,8 +156,8 @@ protected override async Task<Exception> OnMessageReceived(Dictionary<string, ob
139156
ConfirmMessage(transportMessage, RabbitMqMessageConfirmOptions.Ack, consumerContextProperties);
140157
}
141158

142-
var messageProcessor = _messageProcessor;
143-
if (messageProcessor != null || _messageProcessorByRoutingKey.TryGetValue(transportMessage.RoutingKey, out messageProcessor))
159+
var messageProcessor = FindMessageProcessor(transportMessage.RoutingKey);
160+
if (messageProcessor != null)
144161
{
145162
await messageProcessor.ProcessMessage(transportMessage, messageHeaders: messageHeaders, consumerContextProperties: consumerContextProperties, cancellationToken: CancellationToken);
146163
}
@@ -157,7 +174,7 @@ protected override async ValueTask DisposeAsyncCore()
157174
{
158175
await base.DisposeAsyncCore();
159176

160-
foreach (var messageProcessor in _messageProcessorByRoutingKey.Values)
177+
foreach (var messageProcessor in _routingKeyMatcher.AllItems.Distinct())
161178
{
162179
if (messageProcessor is IDisposable disposable)
163180
{

0 commit comments

Comments
 (0)