Skip to content

Commit f86b034

Browse files
committed
[Host.AzureServiceBus] Create topic/queue dynamically when path param uses a non-default value
Signed-off-by: Tomasz Maruszak <[email protected]>
1 parent c69c06b commit f86b034

File tree

4 files changed

+63
-11
lines changed

4 files changed

+63
-11
lines changed

src/Host.Plugin.Properties.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<Import Project="Common.NuGet.Properties.xml" />
55

66
<PropertyGroup>
7-
<Version>3.3.1-rc100</Version>
7+
<Version>3.3.1-rc101</Version>
88
</PropertyGroup>
99

1010
</Project>

src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ public class ServiceBusMessageBus : MessageBusBase<ServiceBusMessageBusSettings>
77
private readonly ILogger _logger;
88
private ServiceBusClient _client;
99
private SafeDictionaryWrapper<string, ServiceBusSender> _producerByPath;
10+
private ServiceBusTopologyService _topologyService;
1011

1112
public ServiceBusMessageBus(MessageBusSettings settings, ServiceBusMessageBusSettings providerSettings)
1213
: base(settings, providerSettings)
1314
{
1415
_logger = LoggerFactory.CreateLogger<ServiceBusMessageBus>();
15-
1616
OnBuildProvider();
1717
}
1818

@@ -46,9 +46,7 @@ protected override async ValueTask DisposeAsyncCore()
4646
public override async Task ProvisionTopology()
4747
{
4848
await base.ProvisionTopology();
49-
50-
var provisioningService = new ServiceBusTopologyService(LoggerFactory.CreateLogger<ServiceBusTopologyService>(), Settings, ProviderSettings);
51-
await provisioningService.ProvisionTopology(); // provisioning happens asynchronously
49+
await _topologyService?.ProvisionTopology(); // provisioning happens asynchronously
5250
}
5351

5452
#region Overrides of MessageBusBase
@@ -59,6 +57,7 @@ protected override void Build()
5957

6058
if (ProviderSettings.TopologyProvisioning?.Enabled ?? false)
6159
{
60+
_topologyService = new ServiceBusTopologyService(LoggerFactory.CreateLogger<ServiceBusTopologyService>(), Settings, ProviderSettings);
6261
InitTaskList.Add(ProvisionTopology, CancellationToken);
6362
}
6463

@@ -133,15 +132,48 @@ public override async Task ProduceToTransport(object message, Type messageType,
133132
{
134133
var transportMessage = GetTransportMessage(message, messageType, messageHeaders, path);
135134
var senderClient = _producerByPath.GetOrAdd(path);
136-
await senderClient.SendMessageAsync(transportMessage, cancellationToken).ConfigureAwait(false);
137-
_logger.LogDebug("Delivered item {Message} of type {MessageType} to {Path}", message, messageType?.Name, path);
135+
136+
try
137+
{
138+
await senderClient.SendMessageAsync(transportMessage, cancellationToken).ConfigureAwait(false);
139+
}
140+
catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityNotFound && _topologyService != null)
141+
{
142+
await EnsurePathExists(messageType, path);
143+
// Resend messages after the path has been created
144+
await senderClient.SendMessageAsync(transportMessage, cancellationToken).ConfigureAwait(false);
145+
}
146+
147+
_logger.LogDebug("Delivered message {Message} of type {MessageType} to {Path}", message, messageType?.Name, path);
138148
}
139149
catch (Exception ex) when (ex is not ProducerMessageBusException && ex is not TaskCanceledException)
140150
{
141151
throw new ProducerMessageBusException(GetProducerErrorMessage(path, message, messageType, ex), ex);
142152
}
143153
}
144154

155+
/// <summary>
156+
/// When the topic or queue does not exist, we can try to create it.
157+
/// This happens in cases where the path is dynamically set upon publish/send (different from default path)
158+
/// </summary>
159+
/// <param name="messageType"></param>
160+
/// <param name="path">topic or queue</param>
161+
/// <returns></returns>
162+
private async Task EnsurePathExists(Type messageType, string path)
163+
{
164+
var producerSettings = GetProducerSettings(messageType);
165+
if (producerSettings.PathKind == PathKind.Topic)
166+
{
167+
_logger.LogInformation("Topic {Path} does not exist, trying to create it", path);
168+
await _topologyService.TryCreateTopic(path, ProviderSettings.TopologyProvisioning.CanProducerCreateTopic);
169+
}
170+
else
171+
{
172+
_logger.LogInformation("Queue {Path} does not exist, trying to create it", path);
173+
await _topologyService.TryCreateQueue(path, ProviderSettings.TopologyProvisioning.CanProducerCreateQueue);
174+
}
175+
}
176+
145177
public override async Task<ProduceToTransportBulkResult<T>> ProduceToTransportBulk<T>(IReadOnlyCollection<T> envelopes, string path, IMessageBusTarget targetBus, CancellationToken cancellationToken)
146178
{
147179
AssertActive();
@@ -150,7 +182,21 @@ Task SendBatchAsync(ServiceBusSender senderClient, ServiceBusMessageBatch batch,
150182
Retry.WithDelay(
151183
operation: async cancellationToken =>
152184
{
153-
await senderClient.SendMessagesAsync(batch, cancellationToken).ConfigureAwait(false);
185+
try
186+
{
187+
await senderClient.SendMessagesAsync(batch, cancellationToken).ConfigureAwait(false);
188+
}
189+
catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityNotFound && _topologyService != null)
190+
{
191+
var messageType = envelopes.FirstOrDefault().MessageType;
192+
if (messageType != null)
193+
{
194+
await EnsurePathExists(messageType, path);
195+
// Resend messages after the path has been created
196+
await senderClient.SendMessagesAsync(batch, cancellationToken).ConfigureAwait(false);
197+
}
198+
}
199+
154200
_logger.LogDebug("Batch of {BatchSize} message(s) dispatched to {Path} ({SizeInBytes} bytes)", batch.Count, path, batch.SizeInBytes);
155201
},
156202
shouldRetry: (exception, attempt) =>

src/SlimMessageBus.Host.AzureServiceBus/ServiceBusTopologyService.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public ServiceBusTopologyService(ILogger<ServiceBusTopologyService> logger, Mess
1616
}
1717

1818
[Flags]
19-
private enum TopologyCreationStatus
19+
internal enum TopologyCreationStatus
2020
{
2121
None = 0,
2222
NotExists = 1,
@@ -51,7 +51,7 @@ private static async Task<T> SwallowExceptionIfMessagingEntityNotFound<T>(Func<T
5151
}
5252
}
5353

54-
private Task<TopologyCreationStatus> TryCreateQueue(string path, bool canCreate, Action<CreateQueueOptions> action) => SwallowExceptionIfEntityExists(async () =>
54+
internal Task<TopologyCreationStatus> TryCreateQueue(string path, bool canCreate, Action<CreateQueueOptions> action = null) => SwallowExceptionIfEntityExists(async () =>
5555
{
5656
if (await _adminClient.QueueExistsAsync(path)) return TopologyCreationStatus.Exists;
5757

@@ -71,7 +71,7 @@ private Task<TopologyCreationStatus> TryCreateQueue(string path, bool canCreate,
7171
return TopologyCreationStatus.Exists | TopologyCreationStatus.Created;
7272
});
7373

74-
private Task<TopologyCreationStatus> TryCreateTopic(string path, bool canCreate, Action<CreateTopicOptions> action) => SwallowExceptionIfEntityExists(async () =>
74+
internal Task<TopologyCreationStatus> TryCreateTopic(string path, bool canCreate, Action<CreateTopicOptions> action = null) => SwallowExceptionIfEntityExists(async () =>
7575
{
7676
if (await _adminClient.TopicExistsAsync(path)) return TopologyCreationStatus.Exists;
7777

src/Tests/SlimMessageBus.Host.AzureServiceBus.Test/ServiceBusMessageBusTests.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Globalization;
55

66
using Azure.Messaging.ServiceBus;
7+
using Azure.Messaging.ServiceBus.Administration;
78

89
using SlimMessageBus.Host;
910
using SlimMessageBus.Host.Collections;
@@ -47,6 +48,11 @@ public ServiceBusMessageBusTests()
4748
SenderMockByPath.Add(path, m);
4849
return m.Object;
4950
},
51+
AdminClientFactory = (_, _) =>
52+
{
53+
var adminClient = new Mock<ServiceBusAdministrationClient>();
54+
return adminClient.Object;
55+
},
5056
TopologyProvisioning = new ServiceBusTopologySettings
5157
{
5258
Enabled = false

0 commit comments

Comments
 (0)