Skip to content

Commit 99baec2

Browse files
MaxNauzarusz
authored andcommitted
[Host.RabbitMQ] Improve Validation and Default Exchange Handling
Signed-off-by: Max Nau <[email protected]> Signed-off-by: governor <[email protected]> Add integration test for default exchange publishing and consuming Signed-off-by: Max Nau <[email protected]> Signed-off-by: governor <[email protected]> Update readme with default exchange section Signed-off-by: governor <[email protected]> Add RabbitMqTopologyServiceTests Signed-off-by: governor <[email protected]> Add RabbitMqMessageBusSettingsValidationServiceTests Signed-off-by: governor <[email protected]>
1 parent 00062c2 commit 99baec2

File tree

6 files changed

+364
-6
lines changed

6 files changed

+364
-6
lines changed

docs/provider_rabbitmq.md

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -362,10 +362,40 @@ services.AddSlimMessageBus((mbb) =>
362362
Avoiding the call `applyDefaultTopology()` will suppress the SMB inferred topology creation.
363363
This might be useful in case the SMB inferred topology is not desired or there are other custom needs.
364364

365-
## Not Supported
365+
### Default Exchange
366366

367-
- [Default type exchanges](https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-default) are not yet supported
368-
- Broker generated queues are not yet supported.
367+
In RabbitMQ, the default exchange (sometimes referred to as the default direct exchange) is a pre-declared, nameless direct exchange with a special behavior:
368+
369+
- Its name is an empty string (`""`).
370+
- It is of type direct.
371+
- Every queue that you declare is automatically bound to this default exchange with a routing key equal to the queues name.
372+
373+
This means:
374+
375+
- When you publish a message to the default exchange (exchange name = `""`) with a routing key set to the queue name, the message is delivered directly to that queueno explicit binding is needed.
376+
377+
Example:
378+
379+
```csharp
380+
channel.basic_publish(
381+
exchange: "", // default exchange
382+
routing_key: "my_queue", // must match the queue name
383+
body: Encoding.UTF8.GetBytes("Hello World!")
384+
);
385+
```
386+
387+
This will deliver the message straight to the `my_queue` queue.
388+
389+
### Why it exists
390+
391+
The default exchange makes it easy to send messages directly to a queue without having to explicitly set up an exchange and binding. Its often used for simple "Hello World" style examples and direct queue messaging.
392+
393+
**Key points to remember**
394+
395+
- The default exchange has no name (`""`).
396+
- Type: direct.
397+
- Auto-binds every queue by its own name.
398+
- Messages published to it must use the queues name as the routing key.
369399

370400
## Recipes
371401

src/SlimMessageBus.Host.RabbitMQ/RabbitMqMessageBusSettingsValidationService.cs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ public override void AssertSettings()
2020

2121
protected override void AssertProducer(ProducerSettings producerSettings)
2222
{
23-
base.AssertProducer(producerSettings);
24-
2523
var exchangeName = producerSettings.DefaultPath;
2624
if (exchangeName == null)
2725
{
@@ -42,7 +40,20 @@ protected override void AssertProducer(ProducerSettings producerSettings)
4240

4341
protected override void AssertConsumer(ConsumerSettings consumerSettings)
4442
{
45-
base.AssertConsumer(consumerSettings);
43+
if (consumerSettings == null) throw new ArgumentNullException(nameof(consumerSettings));
44+
45+
if (consumerSettings.MessageType == null)
46+
{
47+
ThrowConsumerFieldNotSet(consumerSettings, nameof(consumerSettings.MessageType));
48+
}
49+
if (consumerSettings.ConsumerType == null)
50+
{
51+
ThrowConsumerFieldNotSet(consumerSettings, nameof(consumerSettings.ConsumerType));
52+
}
53+
if (consumerSettings.ConsumerMethod == null)
54+
{
55+
ThrowConsumerFieldNotSet(consumerSettings, nameof(consumerSettings.ConsumerMethod));
56+
}
4657

4758
var exchangeName = consumerSettings.Path;
4859
if (exchangeName == null)

src/SlimMessageBus.Host.RabbitMQ/RabbitMqTopologyService.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ private void DeclareQueueBinding(AbstractConsumerSettings settings, string bindi
8080
{
8181
var bindingRoutingKey = settings.GetBindingRoutingKey(_providerSettings) ?? string.Empty;
8282

83+
if (string.IsNullOrEmpty(bindingExchangeName))
84+
{
85+
_logger.LogInformation("Skipping binding for queue {QueueName} because exchange is default (empty string)", queueName);
86+
return;
87+
}
88+
8389
_logger.LogInformation("Binding queue {QueueName} to exchange {ExchangeName} using routing key {RoutingKey}", queueName, bindingExchangeName, bindingRoutingKey);
8490
try
8591
{
@@ -136,6 +142,12 @@ private string DeclareQueue(HasProviderExtensions settings, string queueName, Ac
136142

137143
private void DeclareExchange(HasProviderExtensions settings, string exchangeName)
138144
{
145+
if (string.IsNullOrEmpty(exchangeName))
146+
{
147+
_logger.LogInformation("Skipping exchange declaration because exchange name is default (empty string)");
148+
return;
149+
}
150+
139151
var exchangeType = settings.GetOrDefault(RabbitMqProperties.ExchangeType, _providerSettings, global::RabbitMQ.Client.ExchangeType.Fanout);
140152
var durable = settings.GetOrDefault(RabbitMqProperties.ExchangeDurable, _providerSettings, false);
141153
var autoDelete = settings.GetOrDefault(RabbitMqProperties.ExchangeAutoDelete, _providerSettings, false);
@@ -146,6 +158,12 @@ private void DeclareExchange(HasProviderExtensions settings, string exchangeName
146158

147159
private void DeclareExchange(string exchangeName, string exchangeType, bool durable, bool autoDelete, IDictionary<string, object> arguments = null)
148160
{
161+
if (string.IsNullOrEmpty(exchangeName))
162+
{
163+
_logger.LogInformation("Skipping exchange declaration because exchange name is default (empty string)");
164+
return;
165+
}
166+
149167
_logger.LogInformation("Declaring exchange {ExchangeName}, ExchangeType: {ExchangeType}, Durable: {Durable}, AutoDelete: {AutoDelete}", exchangeName, exchangeType, durable, autoDelete);
150168
try
151169
{
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
namespace SlimMessageBus.Host.RabbitMQ.Test.IntegrationTests;
2+
3+
using System.Linq;
4+
using System.Net.Mime;
5+
using System.Threading.Tasks;
6+
7+
using Microsoft.Extensions.Configuration;
8+
9+
using SlimMessageBus.Host.Serialization.Json;
10+
using SlimMessageBus.Host.Test.Common.IntegrationTest;
11+
12+
[Trait("Category", "Integration")]
13+
[Trait("Transport", "RabbitMQ")]
14+
public class RabbitMqDefaultExchangeIt(ITestOutputHelper output) : BaseIntegrationTest<RabbitMqDefaultExchangeIt>(output)
15+
{
16+
[Fact]
17+
public async Task PublishDirectlyToQueueUsingDefaultExchange()
18+
{
19+
const string queueName = "default-exchange-queue";
20+
21+
AddBusConfiguration(mbb =>
22+
{
23+
mbb.Produce<PingMessage>(x => x
24+
.DefaultPath(string.Empty)
25+
.RoutingKeyProvider((m, ctx) => queueName)
26+
.MessagePropertiesModifier((m, p) =>
27+
{
28+
p.MessageId = $"ID_{m.Counter}";
29+
p.ContentType = MediaTypeNames.Application.Json;
30+
}));
31+
32+
mbb.Consume<PingMessage>(x => x
33+
.Path(string.Empty) // default exchange
34+
.Queue(queueName)
35+
.WithConsumer<PingConsumer>());
36+
});
37+
38+
var messageBus = ServiceProvider.GetRequiredService<IMessageBus>();
39+
var consumedMessages = ServiceProvider.GetRequiredService<TestEventCollector<TestEvent>>();
40+
41+
var ping = new PingMessage { Counter = 42 };
42+
43+
// act
44+
await messageBus.Publish(ping);
45+
46+
await consumedMessages.WaitUntilArriving();
47+
48+
// assert
49+
var received = consumedMessages.Snapshot().Single();
50+
received.Message.Counter.Should().Be(42);
51+
received.Message.Value.Should().Be(ping.Value);
52+
}
53+
54+
protected override void SetupServices(ServiceCollection services, IConfigurationRoot configuration)
55+
{
56+
services.AddSlimMessageBus((mbb) =>
57+
{
58+
mbb.WithProviderRabbitMQ(cfg =>
59+
{
60+
cfg.ConnectionString = Secrets.Service.PopulateSecrets(configuration["RabbitMQ:ConnectionString"]);
61+
62+
cfg.ConnectionFactory.ClientProvidedName = $"MyService_{Environment.MachineName}";
63+
64+
cfg.UseMessagePropertiesModifier((m, p) =>
65+
{
66+
p.ContentType = MediaTypeNames.Application.Json;
67+
});
68+
cfg.UseQueueDefaults(durable: false);
69+
cfg.UseTopologyInitializer((channel, applyDefaultTopology) =>
70+
{
71+
// before test clean up
72+
channel.QueueDelete("default-exchange-queue", ifUnused: true, ifEmpty: false);
73+
74+
// apply default SMB inferred topology
75+
applyDefaultTopology();
76+
77+
// after
78+
});
79+
});
80+
mbb.AddServicesFromAssemblyContaining<PingConsumer>();
81+
mbb.AddJsonSerializer();
82+
ApplyBusConfiguration(mbb);
83+
});
84+
85+
// Custom error handler
86+
services.AddTransient(typeof(IRabbitMqConsumerErrorHandler<>), typeof(CustomRabbitMqConsumerErrorHandler<>));
87+
88+
services.AddSingleton<TestEventCollector<TestEvent>>();
89+
}
90+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
namespace SlimMessageBus.Host.RabbitMQ.Test;
2+
using System;
3+
4+
using AwesomeAssertions;
5+
6+
using Xunit;
7+
8+
public class RabbitMqMessageBusSettingsValidationServiceTests
9+
{
10+
internal class TestableRabbitMqValidationService : RabbitMqMessageBusSettingsValidationService
11+
{
12+
public TestableRabbitMqValidationService(MessageBusSettings settings, RabbitMqMessageBusSettings providerSettings)
13+
: base(settings, providerSettings)
14+
{
15+
}
16+
17+
// Public wrapper to call the protected method
18+
public new void AssertConsumer(ConsumerSettings consumerSettings)
19+
{
20+
base.AssertConsumer(consumerSettings);
21+
}
22+
}
23+
24+
private TestableRabbitMqValidationService CreateService()
25+
{
26+
var busSettings = new MessageBusSettings();
27+
var providerSettings = new RabbitMqMessageBusSettings();
28+
return new TestableRabbitMqValidationService(busSettings, providerSettings);
29+
}
30+
31+
[Fact]
32+
public void AssertConsumer_ShouldThrowArgumentNullException_WhenConsumerSettingsIsNull()
33+
{
34+
var service = CreateService();
35+
36+
service.Invoking(s => s.AssertConsumer(null!))
37+
.Should().Throw<ArgumentNullException>()
38+
.WithMessage("*consumerSettings*");
39+
}
40+
41+
[Fact]
42+
public void AssertConsumer_ShouldThrow_WhenMessageTypeIsNull()
43+
{
44+
var service = CreateService();
45+
var consumerSettings = new ConsumerSettings
46+
{
47+
ConsumerType = typeof(object),
48+
Path = "exchange-name",
49+
};
50+
51+
var expectedMessage = $"Consumer (): The MessageType is not set";
52+
53+
service.Invoking(s => s.AssertConsumer(consumerSettings))
54+
.Should().Throw<ConfigurationMessageBusException>()
55+
.WithMessage(expectedMessage);
56+
}
57+
58+
[Fact]
59+
public void AssertConsumer_ShouldThrow_WhenConsumerTypeIsNull()
60+
{
61+
var service = CreateService();
62+
var consumerSettings = new ConsumerSettings
63+
{
64+
MessageType = typeof(object),
65+
Path = "exchange-name",
66+
};
67+
68+
var expectedMessage = $"Consumer (Object): The ConsumerType is not set";
69+
70+
service.Invoking(s => s.AssertConsumer(consumerSettings))
71+
.Should().Throw<ConfigurationMessageBusException>()
72+
.WithMessage(expectedMessage);
73+
}
74+
75+
[Fact]
76+
public void AssertConsumer_ShouldThrow_WhenConsumerMethodIsNull()
77+
{
78+
var service = CreateService();
79+
var consumerSettings = new ConsumerSettings
80+
{
81+
MessageType = typeof(object),
82+
ConsumerType = typeof(object),
83+
Path = "exchange-name",
84+
};
85+
86+
var expectedMessage = $"Consumer (Object): The ConsumerMethod is not set";
87+
88+
service.Invoking(s => s.AssertConsumer(consumerSettings))
89+
.Should().Throw<ConfigurationMessageBusException>()
90+
.WithMessage(expectedMessage);
91+
}
92+
93+
[Fact]
94+
public void AssertConsumer_ShouldThrow_WhenPathIsNull()
95+
{
96+
var service = CreateService();
97+
var consumerSettings = new ConsumerSettings
98+
{
99+
MessageType = typeof(object),
100+
ConsumerType = typeof(object),
101+
ConsumerMethod = new ConsumerMethod((q, w, e, r) => { return Task.CompletedTask; }),
102+
Path = null,
103+
};
104+
105+
var expectedMessage = $"Consumer (Object): The ExchangeBinding is not set";
106+
107+
service.Invoking(s => s.AssertConsumer(consumerSettings))
108+
.Should().Throw<ConfigurationMessageBusException>()
109+
.WithMessage(expectedMessage);
110+
}
111+
112+
[Fact]
113+
public void AssertConsumer_ShouldThrow_WhenQueueNameIsNull()
114+
{
115+
var service = CreateService();
116+
var consumerSettings = new ConsumerSettings
117+
{
118+
MessageType = typeof(object),
119+
ConsumerType = typeof(object),
120+
ConsumerMethod = new ConsumerMethod((q, w, e, r) => { return Task.CompletedTask; }),
121+
Path = "exchange-name"
122+
};
123+
124+
var expectedMessage = $"Consumer (Object): The Queue is not set";
125+
126+
service.Invoking(s => s.AssertConsumer(consumerSettings))
127+
.Should().Throw<ConfigurationMessageBusException>()
128+
.WithMessage(expectedMessage);
129+
}
130+
}

0 commit comments

Comments
 (0)