Skip to content
8 changes: 8 additions & 0 deletions docs/preview/03-Features/01-Azure/01-service-bus.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,14 @@ services.AddServiceBus[Topic/Queue]MessagePump(..., options =>
// when deserializing the incoming message (default: AdditionalMemberHandling.Error).
options.Routing.Deserialization.AdditionalMembers = AdditionalMemberHandling.Ignore;

// Sets a function that will be run before the message pump starts receiving messages.
// Useful for when dependent services are not always directly available.
options.Hooks.BeforeStartup(async (IServiceProvider services) =>
{
var dependency = services.GetRequiredService<IMyDependentService>();
await dependency.WaitForStatusAsync(HealthStatus.Available);
});

// Configure the message pump to use sessions to receive messages (with or without additional options).
options.UseSessions();
options.UseSessions(sessions =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;
using Arcus.Messaging.Abstractions.MessageHandling;
using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling;

Expand Down Expand Up @@ -59,6 +60,11 @@ public string JobId
}
}

/// <summary>
/// Gets the options related to manipulating the lifecycle of the message pump in the application.
/// </summary>
public ServiceBusMessagePumpHooksOptions Hooks { get; } = new();

/// <summary>
/// Gets the consumer-configurable options to change the behavior of the message router.
/// </summary>
Expand All @@ -80,7 +86,7 @@ public void UseSessions()
/// Activates the session-aware message pump that processes messages in the Azure Service Bus
/// with additional configuration options for session handling.
/// </summary>
/// <param name="configureSessionOptions"></param>
/// <param name="configureSessionOptions">The function to manipulate how sessions should be handled by the pump.</param>
public void UseSessions(Action<ServiceBusSessionOptions> configureSessionOptions)
{
RequestedToUseSessions = true;
Expand Down Expand Up @@ -152,4 +158,27 @@ public TimeSpan SessionIdleTimeout
}
}
}

/// <summary>
/// Represents the options related to the <see cref="ServiceBusMessagePumpOptions.Hooks"/> of the registered Azure Service Bus message pump.
/// </summary>
public class ServiceBusMessagePumpHooksOptions
{
internal Func<IServiceProvider, Task> BeforeStartupAsync { get; set; } = _ => Task.CompletedTask;

/// <summary>
/// Sets a function on the message pump that should run before the pump receives Azure Service Bus messages.
/// Useful for when dependent systems are not always directly available.
/// </summary>
/// <remarks>
/// ⚠️ Multiple calls will override each other.
/// </remarks>
/// <param name="beforeStartupAsync">The function that upon completion 'triggers' the message pump to be started.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="beforeStartupAsync"/> is <c>null</c>.</exception>
public void BeforeStartup(Func<IServiceProvider, Task> beforeStartupAsync)
{
ArgumentNullException.ThrowIfNull(beforeStartupAsync);
BeforeStartupAsync = beforeStartupAsync;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ protected ServiceBusMessagePump(
/// <remarks>See <see href="https://docs.microsoft.com/dotnet/core/extensions/workers">Worker Services in .NET</see> for implementation guidelines.</remarks>
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Options.Hooks.BeforeStartupAsync(ServiceProvider);
try
{
await StartProcessingMessagesAsync(stoppingToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
using System.Threading.Tasks;
using Arcus.Messaging.Abstractions;
using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling;
using Arcus.Messaging.Pumps.ServiceBus.Configuration;
using Arcus.Messaging.Tests.Core.Correlation;
using Arcus.Messaging.Tests.Core.Events.v1;
using Arcus.Messaging.Tests.Core.Generators;
Expand All @@ -23,6 +22,7 @@
using Newtonsoft.Json;
using Xunit;
using ServiceBusEntityType = Arcus.Messaging.Abstractions.ServiceBus.ServiceBusEntityType;
using ServiceBusMessagePumpOptions = Arcus.Messaging.Pumps.ServiceBus.Configuration.ServiceBusMessagePumpOptions;

namespace Arcus.Messaging.Tests.Integration.MessagePump.Fixture
{
Expand All @@ -31,6 +31,9 @@ namespace Arcus.Messaging.Tests.Integration.MessagePump.Fixture
/// </summary>
internal class ServiceBusTestContext : IAsyncDisposable
{
private enum ServiceBusOperation { TriggerRun, ClientCreation }
private readonly Collection<(DateTimeOffset time, ServiceBusOperation type)> _timedOperations = [];

private readonly TemporaryServiceBusEntityState _serviceBus;
private readonly ServiceBusConfig _serviceBusConfig;
private readonly List<IAsyncDisposable> _disposables = [];
Expand All @@ -51,6 +54,7 @@ private ServiceBusTestContext(TemporaryServiceBusEntityState serviceBus, ILogger

private TemporaryQueue Queue => UseSessions ? _serviceBus.QueueWithSession : _serviceBus.Queue;
private TemporaryTopic Topic => _serviceBus.Topic;
private bool UseTrigger { get; } = Bogus.Random.Bool();

/// <summary>
/// Gets or sets a value indicating whether the Azure Service Bus message pump managed by the test context should use sessions.
Expand Down Expand Up @@ -102,15 +106,25 @@ internal ServiceBusMessageHandlerCollection WhenOnlyServiceBusQueueMessagePump(A
string sessionAwareDescription = UseSessions ? " session-aware" : string.Empty;
_logger.LogTrace("[Test:Setup] Register Azure Service Bus{SessionDescription} queue message pump", sessionAwareDescription);

return Services.AddServiceBusQueueMessagePump(Queue.Name, _serviceBusConfig.HostName, new DefaultAzureCredential(), options =>
return UseTrigger
? Services.AddServiceBusQueueMessagePump(Queue.Name, _ => CreateServiceBusClient(), ConfigureWithTrigger)
: Services.AddServiceBusQueueMessagePump(Queue.Name, _serviceBusConfig.HostName, new DefaultAzureCredential(), ConfigureWithoutTrigger);

void ConfigureWithoutTrigger(ServiceBusMessagePumpOptions options)
{
if (UseSessions)
{
options.UseSessions();
}

configureOptions?.Invoke(options);
});
}

void ConfigureWithTrigger(ServiceBusMessagePumpOptions options)
{
ConfigureWithoutTrigger(options);
AddServiceBusBeforeStartupHook(options);
}
}

/// <summary>
Expand All @@ -124,17 +138,43 @@ internal ServiceBusMessageHandlerCollection WhenServiceBusTopicMessagePump(Actio
string sessionAwareDescription = UseSessions ? " session-aware" : string.Empty;
_logger.LogTrace("[Test:Setup] Register Azure Service Bus{SessionDescription} topic message pump", sessionAwareDescription);

return Services.AddServiceBusTopicMessagePump(Topic.Name, subscriptionName, _serviceBusConfig.HostName, new DefaultAzureCredential(), options =>
var collection = UseTrigger
? Services.AddServiceBusTopicMessagePump(Topic.Name, subscriptionName, _ => CreateServiceBusClient(), ConfigureWithTrigger)
: Services.AddServiceBusTopicMessagePump(Topic.Name, subscriptionName, _serviceBusConfig.HostName, new DefaultAzureCredential(), ConfigureWithoutTrigger);

return collection.WithUnrelatedServiceBusMessageHandler()
.WithUnrelatedServiceBusMessageHandler();

void ConfigureWithoutTrigger(ServiceBusMessagePumpOptions options)
{
if (UseSessions)
{
options.UseSessions(sessions => sessions.SessionIdleTimeout = TimeSpan.FromSeconds(1));
}

configureOptions?.Invoke(options);
}

}).WithUnrelatedServiceBusMessageHandler()
.WithUnrelatedServiceBusMessageHandler();
void ConfigureWithTrigger(ServiceBusMessagePumpOptions options)
{
ConfigureWithoutTrigger(options);
AddServiceBusBeforeStartupHook(options);
}
}

private ServiceBusClient CreateServiceBusClient()
{
_timedOperations.Add((DateTimeOffset.UtcNow, ServiceBusOperation.ClientCreation));
return new ServiceBusClient(_serviceBusConfig.HostName, new DefaultAzureCredential());
}

private void AddServiceBusBeforeStartupHook(ServiceBusMessagePumpOptions options)
{
options.Hooks.BeforeStartup(_ =>
{
_timedOperations.Add((DateTimeOffset.UtcNow, ServiceBusOperation.TriggerRun));
return Task.CompletedTask;
});
}

/// <summary>
Expand Down Expand Up @@ -225,6 +265,7 @@ private ServiceBusMessage[] CreateMessages(Action<ServiceBusMessageBuilder>[] co
/// </summary>
internal async Task ShouldConsumeViaMatchedHandlerAsync(IEnumerable<ServiceBusMessage> messages)
{
AssertHooks();
foreach (var message in messages)
{
OrderCreatedEventData eventData = await DiskMessageEventConsumer.ConsumeOrderCreatedAsync(message.MessageId);
Expand All @@ -237,6 +278,7 @@ internal async Task ShouldConsumeViaMatchedHandlerAsync(IEnumerable<ServiceBusMe
/// </summary>
internal async Task ShouldCompleteConsumedAsync(IEnumerable<ServiceBusMessage> messages)
{
AssertHooks();
foreach (var message in messages)
{
await Poll.Target(() => Queue.Messages.Where(msg => msg.MessageId == message.MessageId).ToListAsync())
Expand All @@ -250,6 +292,7 @@ await Poll.Target(() => Queue.Messages.Where(msg => msg.MessageId == message.Mes
/// </summary>
internal async Task ShouldNotConsumeButDeadLetterAsync(IEnumerable<ServiceBusMessage> messages)
{
AssertHooks();
foreach (var message in messages)
{
await Poll.Target(() => Queue.Messages.FromDeadLetter().Where(msg => msg.MessageId == message.MessageId).ToListAsync())
Expand All @@ -265,9 +308,9 @@ await Poll.Target(() => Queue.Messages.FromDeadLetter().Where(msg => msg.Message
/// </summary>
internal async Task ShouldNotConsumeButAbandonAsync(IEnumerable<ServiceBusMessage> messages)
{
AssertHooks();
foreach (var message in messages)
{

await Poll.Target(() => Queue.Messages.Where(msg => msg.MessageId == message.MessageId && msg.DeliveryCount > 0).ToListAsync())
.Until(abandoned => abandoned.Count > 0)
.Every(TimeSpan.FromMilliseconds(100))
Expand All @@ -276,6 +319,20 @@ await Poll.Target(() => Queue.Messages.Where(msg => msg.MessageId == message.Mes
}
}

private void AssertHooks()
{
if (UseTrigger)
{
Assert.Collection(_timedOperations,
op => Assert.Equal(ServiceBusOperation.TriggerRun, op.type),
op => Assert.Equal(ServiceBusOperation.ClientCreation, op.type));

DateTimeOffset[] times = _timedOperations.Select(op => op.time).ToArray();
Assert.True(times.Order().SequenceEqual(times),
$"Service Bus operations should be run in the expected order, but weren't: {string.Join(", ", times.Select(t => t.ToString("s")))}");
}
}

private static void AssertReceivedOrderEventDataForW3C(
ServiceBusMessage message,
OrderCreatedEventData receivedEventData)
Expand Down