Skip to content

Commit 609da1e

Browse files
authored
feat(msg.routing): add message handler summary logging (#648)
Provides a summary-logging structure during the message routing which focuses on the functionality rather than the technicality of message handler registration. An example of these logs: ### Error, that results in dead-lettering ```log 2025-09-08T08:38:42 Debug > [Received] message (message ID="c353d940-fb6e-4990-a8d4-3e15be9e874e") on Azure Service Bus Queue message pump 2025-09-08T08:38:42 Debug > Message "c353d940-fb6e-4990-a8d4-3e15be9e874e" [Skipped by] "PassThruOrderMessageHandler" => "✗ custom context filter failed (returns 'false')" 2025-09-08T08:38:42 Debug > Message "c353d940-fb6e-4990-a8d4-3e15be9e874e" [Skipped by] "ShipmentAzureServiceBusMessageHandler" => "✗ default JSON body parsing failed (exception thrown)": System.Text.Json.JsonException: The JSON property 'Amount' could not be mapped to any .NET member contained in type 'Arcus.Messaging.Tests.Core.Messages.v1.Shipment'. 2025-09-08T08:38:42 Debug > Message "c353d940-fb6e-4990-a8d4-3e15be9e874e" [Skipped by] "OrderBatchMessageHandler" => " ✓ custom context filter passed (against type=AzureServiceBusMessageContext) ✗ default JSON body parsing failed (exception thrown)": System.Text.Json.JsonException: The JSON property 'Id' could not be mapped to any .NET member contained in type 'Arcus.Messaging.Tests.Core.Messages.v1.OrderBatch'. 2025-09-08T08:38:42 Error > Message '"c353d940-fb6e-4990-a8d4-3e15be9e874e"' [Failed in] message pump => ✗ no matched handler handled found for message 2025-09-08T08:38:42 Debug > [Settle:DeadLetter] message (message ID="c353d940-fb6e-4990-a8d4-3e15be9e874e") on Azure Service Bus Queue message pump => "no matched handler handled found for message" ``` ### Success, after skipping some handlers ```log 2025-09-08T08:40:10 Debug > [Received] message (message ID="9988e52e-030b-ea6c-9bf5-0629cb0727cf") on Azure Service Bus Queue message pump 2025-09-08T08:40:10 Debug > Message "9988e52e-030b-ea6c-9bf5-0629cb0727cf" [Skipped by] "PassThruOrderMessageHandler" => "✗ custom context filter failed (returns 'false')" 2025-09-08T08:40:10 Debug > Message "9988e52e-030b-ea6c-9bf5-0629cb0727cf" [Skipped by] "PassThruOrderMessageHandler" => "✗ custom context filter failed (returns 'false')" 2025-09-08T08:40:10 Trace > [Test] Write order v1 message to disk: "9988e52e-030b-ea6c-9bf5-0629cb0727cf" 2025-09-08T08:40:10 Debug > "Order" "9988e52e-030b-ea6c-9bf5-0629cb0727cf" [Processed by] "WriteOrderToDiskAzureServiceBusMessageHandler" => " ✓ custom context filter passed (against type=AzureServiceBusMessageContext) ✓ default JSON body parsing passed (additional members=Error) ✓ custom body filter passed (against type=Order)" 2025-09-08T08:40:10 Information > Message '"9988e52e-030b-ea6c-9bf5-0629cb0727cf"' was processed by "WriteOrderToDiskAzureServiceBusMessageHandler" | skipped by "2 other handlers" ``` The `DEBUG` is used to write the matching pre-checks, `INFO`/`ERROR` is used as the final summary, written as a single sentence. To accomplish this, this PR places all the logging within the non-Azure Service Bus-related message routing functionality. This gives us the opportunity to only focus on Azure Service Bus-related code in the `ServiceBusMessageRouter` 👉 which it should be. Furthermore, the integration tests have been enhanced to always add 'unrelated' message handlers, to trigger the message routing more and not to rely on unit tests (which are not only there to test deprecated functionality). Closes #647
1 parent d766c62 commit 609da1e

File tree

14 files changed

+595
-212
lines changed

14 files changed

+595
-212
lines changed

src/Arcus.Messaging.Abstractions.ServiceBus/Extensions/ServiceBusMessageHandlerOptions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ public class ServiceBusMessageHandlerOptions<TMessage>
1919
private readonly Collection<Func<AzureServiceBusMessageContext, bool>> _messageContextFilters = [];
2020

2121
internal Func<IServiceProvider, IMessageBodySerializer> MessageBodySerializerImplementationFactory { get; private set; }
22-
internal Func<TMessage, bool> MessageBodyFilter => msg => _messageBodyFilters.All(filter => filter(msg));
23-
internal Func<AzureServiceBusMessageContext, bool> MessageContextFilter => ctx => _messageContextFilters.All(filter => filter(ctx));
22+
internal Func<TMessage, bool> MessageBodyFilter => _messageBodyFilters.Count is 0 ? null : msg => _messageBodyFilters.All(filter => filter(msg));
23+
internal Func<AzureServiceBusMessageContext, bool> MessageContextFilter => _messageContextFilters.Count is 0 ? null : ctx => _messageContextFilters.All(filter => filter(ctx));
2424

2525
/// <summary>
2626
/// Adds a custom serializer instance that deserializes the incoming <see cref="ServiceBusReceivedMessage.Body"/>.

src/Arcus.Messaging.Abstractions/Arcus.Messaging.Abstractions.csproj

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@
2626
<None Include="..\..\docs\static\img\icon.png" Pack="true" PackagePath="\" />
2727
</ItemGroup>
2828

29+
30+
<ItemGroup>
31+
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="[8.*,10.0.0)" />
32+
</ItemGroup>
33+
2934
<ItemGroup>
3035
<!-- TODO: will be removed in v3.0 -->
3136
<PackageReference Include="Microsoft.ApplicationInsights.WorkerService" Version="2.21.0" />

src/Arcus.Messaging.Abstractions/MessageHandling/MessageHandler.cs

Lines changed: 170 additions & 91 deletions
Large diffs are not rendered by default.

src/Arcus.Messaging.Abstractions/MessageHandling/MessageRouter.cs

Lines changed: 338 additions & 21 deletions
Large diffs are not rendered by default.

src/Arcus.Messaging.Pumps.ServiceBus/ServiceBusMessagePump.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -166,11 +166,6 @@ protected async Task<MessageProcessingResult> RouteMessageAsync(ServiceBusReceiv
166166
return MessageProcessingResult.Failure(message.MessageId, MessageProcessingError.ProcessingInterrupted, "Cannot process received message as the message pump is shutting down");
167167
}
168168

169-
if (string.IsNullOrEmpty(message.CorrelationId))
170-
{
171-
Logger.LogTrace("No operation ID was found on the message '{MessageId}' during processing in the Azure Service Bus {EntityType} message pump '{JobId}'", message.MessageId, EntityType, JobId);
172-
}
173-
174169
#pragma warning disable CS0618 // Type or member is obsolete
175170
using MessageCorrelationResult correlationResult = DetermineMessageCorrelation(message);
176171

src/Arcus.Messaging.Pumps.ServiceBus/ServiceBusMessageRouter.cs

Lines changed: 25 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -87,50 +87,37 @@ private async Task<MessageProcessingResult> TryRoutingMessageViaRegisteredMessag
8787
MessageCorrelationInfo correlationInfo,
8888
CancellationToken cancellationToken)
8989
{
90-
try
91-
{
92-
MessageHandler[] messageHandlers = GetRegisteredMessageHandlers(serviceProvider).ToArray();
93-
if (messageHandlers.Length <= 0)
94-
{
95-
await DeadLetterMessageNoHandlerRegisteredAsync(messageContext);
96-
return MessageProcessingResult.Failure(message.MessageId, CannotFindMatchedHandler, "Failed to process message in the message pump as no message handler is registered in the dependency container");
97-
}
90+
using var _ = Logger.BeginScope(new Dictionary<string, string> { ["MessageId"] = messageContext.MessageId });
9891

99-
string messageBody = LoadMessageBody(message, messageContext);
100-
bool hasGoneThroughMessageHandler = false;
92+
Logger.LogDebug("[Received] message (message ID={MessageId}) on Azure Service Bus {EntityType} message pump", messageContext.MessageId, messageContext.EntityType);
10193

102-
foreach (MessageHandler messageHandler in messageHandlers)
103-
{
104-
MessageResult result = await DeserializeMessageForHandlerAsync(messageBody, messageContext, messageHandler);
105-
if (result.IsSuccess)
106-
{
107-
bool isProcessed = await messageHandler.ProcessMessageAsync(result.DeserializedMessage, messageContext, correlationInfo, cancellationToken);
108-
109-
hasGoneThroughMessageHandler = true;
110-
if (isProcessed)
111-
{
112-
await PotentiallyAutoCompleteMessageAsync(messageContext);
113-
return MessageProcessingResult.Success(message.MessageId);
114-
}
115-
}
116-
}
94+
string messageBody = LoadMessageBody(message, messageContext);
11795

118-
if (hasGoneThroughMessageHandler)
119-
{
120-
await AbandonMessageMatchedHandlerFailedAsync(messageContext);
121-
return MessageProcessingResult.Failure(message.MessageId, MatchedHandlerFailed, "Failed to process Azure Service Bus message in pump as the matched handler did not successfully processed the message");
122-
}
96+
MessageProcessingResult result =
97+
await RouteMessageThroughRegisteredHandlersAsync(serviceProvider, messageBody, messageContext, correlationInfo, cancellationToken);
12398

124-
await DeadLetterMessageNoHandlerMatchedAsync(messageContext);
125-
return MessageProcessingResult.Failure(message.MessageId, CannotFindMatchedHandler, "Failed to process Azure Service Bus message in pump as no message handler was matched against the message");
99+
if (result.IsSuccessful)
100+
{
101+
await PotentiallyAutoCompleteMessageAsync(messageContext);
126102
}
127-
catch (Exception exception)
103+
else
128104
{
129-
Logger.LogCritical(exception, "Unable to process message with ID '{MessageId}'", message.MessageId);
130-
131-
await messageContext.AbandonMessageAsync(new Dictionary<string, object>(), CancellationToken.None);
132-
return MessageProcessingResult.Failure(message.MessageId, ProcessingInterrupted, "Failed to process message in pump as there was an unexpected critical problem during processing, please see the logs for more information", exception);
105+
switch (result.Error)
106+
{
107+
case ProcessingInterrupted:
108+
case MatchedHandlerFailed:
109+
Logger.LogDebug("[Settle:Abandon] message (message ID={MessageId}) on Azure Service Bus {EntityType} message pump => {ErrorMessage}", messageContext.MessageId, messageContext.EntityType, result.ErrorMessage);
110+
await messageContext.AbandonMessageAsync(new Dictionary<string, object>(), CancellationToken.None);
111+
break;
112+
113+
case CannotFindMatchedHandler:
114+
Logger.LogDebug("[Settle:DeadLetter] message (message ID={MessageId}) on Azure Service Bus {EntityType} message pump => {ErrorMessage}", messageContext.MessageId, messageContext.EntityType, result.ErrorMessage);
115+
await messageContext.DeadLetterMessageAsync(CannotFindMatchedHandler.ToString(), result.ErrorMessage, CancellationToken.None);
116+
break;
117+
}
133118
}
119+
120+
return result;
134121
}
135122

136123
private static string LoadMessageBody(ServiceBusReceivedMessage message, AzureServiceBusMessageContext context)
@@ -165,33 +152,14 @@ private async Task PotentiallyAutoCompleteMessageAsync(AzureServiceBusMessageCon
165152
{
166153
try
167154
{
168-
Logger.LogTrace("Auto-complete message '{MessageId}' (if needed) after processing in Azure Service Bus in message pump '{JobId}'", messageContext.MessageId, messageContext.JobId);
169155
await messageContext.CompleteMessageAsync(CancellationToken.None);
170156
}
171157
catch (ServiceBusException exception) when (exception.Reason is ServiceBusFailureReason.MessageLockLost)
172158
{
173-
Logger.LogTrace(exception, "Message '{MessageId}' on Azure Service Bus in message pump '{JobId}' does not need to be auto-completed, because it was already settled", messageContext.MessageId, messageContext.JobId);
159+
Logger.LogTrace(exception, "[Skipped] auto-completion of message '{MessageId}' in Azure Service Bus message pump (already settled)", messageContext.MessageId);
174160
}
175161
}
176162
}
177-
178-
private async Task DeadLetterMessageNoHandlerRegisteredAsync(AzureServiceBusMessageContext messageContext)
179-
{
180-
Logger.LogError("Failed to process Azure Service Bus message '{MessageId}' in pump '{JobId}' as no message handlers were registered in the application services, dead-lettering message!", messageContext.MessageId, messageContext.JobId);
181-
await messageContext.DeadLetterMessageAsync(CannotFindMatchedHandler.ToString(), "No message handlers were registered in the application services", CancellationToken.None);
182-
}
183-
184-
private async Task DeadLetterMessageNoHandlerMatchedAsync(AzureServiceBusMessageContext messageContext)
185-
{
186-
Logger.LogError("Failed to process Azure Service Bus message '{MessageId}' in pump '{JobId}' as no registered message handler was matched against the message, dead-lettering message!", messageContext.MessageId, messageContext.JobId);
187-
await messageContext.DeadLetterMessageAsync(CannotFindMatchedHandler.ToString(), "No registered message handler was matched against the message", CancellationToken.None);
188-
}
189-
190-
private async Task AbandonMessageMatchedHandlerFailedAsync(AzureServiceBusMessageContext messageContext)
191-
{
192-
Logger.LogDebug("Failed to process Azure Service Bus message '{MessageId}' in pump '{JobId}' as the matched message handler did not successfully process the message, abandoning message!", messageContext.MessageId, messageContext.JobId);
193-
await messageContext.AbandonMessageAsync(new Dictionary<string, object>(), CancellationToken.None);
194-
}
195163
}
196164

197165
internal static class ILoggerExtensions

src/Arcus.Messaging.Pumps.ServiceBus/ServiceBusReceiverMessagePump.cs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ protected override async Task StartProcessingMessagesAsync(CancellationToken can
7272
https://github.com/arcus-azure/arcus.messaging/issues/176 */
7373
#pragma warning restore S1135
7474

75-
Logger.LogInformation("Azure Service Bus {EntityType} message pump '{JobId}' on entity path '{EntityPath}' in namespace '{Namespace}' started", EntityType, JobId, EntityName, Namespace);
75+
Logger.LogTrace("Azure Service Bus {EntityType} message pump '{JobId}' on entity path '{EntityPath}' in namespace '{Namespace}' started", EntityType, JobId, EntityName, Namespace);
7676

7777
_receiveMessagesCancellation = new CancellationTokenSource();
7878
while (CircuitState.IsClosed
@@ -179,7 +179,7 @@ protected override async Task StopProcessingMessagesAsync()
179179
IsStarted = false;
180180
CircuitState = CircuitState.TransitionTo(CircuitBreakerState.Open);
181181

182-
Logger.LogInformation("Azure Service Bus {EntityType} message pump '{JobId}' on entity path '{EntityPath}' in '{Namespace}' closed : {Time}", EntityType, JobId, EntityName, Namespace, DateTimeOffset.UtcNow);
182+
Logger.LogTrace("Azure Service Bus {EntityType} message pump '{JobId}' on entity path '{EntityPath}' in '{Namespace}' closed : {Time}", EntityType, JobId, EntityName, Namespace, DateTimeOffset.UtcNow);
183183

184184
if (_receiveMessagesCancellation != null)
185185
{
@@ -210,11 +210,6 @@ private async Task<MessageProcessingResult> ProcessMessageAsync(ServiceBusReceiv
210210
return MessageProcessingResult.Failure("<unavailable>", MessageProcessingError.ProcessingInterrupted, "Cannot process received message as the message is was 'null'");
211211
}
212212

213-
if (string.IsNullOrEmpty(message.CorrelationId))
214-
{
215-
Logger.LogTrace("No operation ID was found on the message '{MessageId}' during processing in the Azure Service Bus {EntityType} message pump '{JobId}'", message.MessageId, EntityType, JobId);
216-
}
217-
218213
var messageContext = AzureServiceBusMessageContext.Create(JobId, EntityType, _messageReceiver, message);
219214

220215
MessageProcessingResult routingResult = await RouteMessageAsync(message, messageContext, cancellationToken);

src/Arcus.Messaging.Pumps.ServiceBus/ServiceBusSessionMessagePump.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,6 @@ private async Task ProcessMessageAsync(ProcessSessionMessageEventArgs arg)
7474
return;
7575
}
7676

77-
if (string.IsNullOrEmpty(message.CorrelationId))
78-
{
79-
Logger.LogTrace("No operation ID was found on the message '{MessageId}' during processing in the Azure Service Bus {EntityType} session-aware message pump '{JobId}'", message.MessageId, EntityType, JobId);
80-
}
81-
8277
var messageContext = AzureServiceBusMessageContext.Create(JobId, EntityType, arg);
8378
await RouteMessageAsync(message, messageContext, arg.CancellationToken);
8479
}

src/Arcus.Messaging.Tests.Core/ServiceBus/MessageBodyHandlers/OrderBatchMessageBodySerializer.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,14 @@ public OrderBatchMessageBodySerializer(ILogger<OrderBatchMessageBodySerializer>
2828
/// </returns>
2929
public Task<MessageResult> DeserializeMessageAsync(string messageBody)
3030
{
31-
_logger.LogTrace("Start deserializing to an 'Order'...");
3231
var order = JsonConvert.DeserializeObject<Order>(messageBody);
33-
32+
3433
if (order is null)
3534
{
36-
_logger.LogError("Cannot deserialize incoming message to an 'Order', so can't use 'Order'");
3735
return Task.FromResult(MessageResult.Failure("Cannot deserialize incoming message to an 'Order', so can't use 'Order'"));
3836
}
3937

40-
_logger.LogInformation("Deserialized to an 'Order', using 'Order'");
41-
return Task.FromResult(MessageResult.Success(new OrderBatch { Orders = new [] { order } }));
38+
return Task.FromResult(MessageResult.Success(new OrderBatch { Orders = new[] { order } }));
4239
}
4340
}
4441
}

src/Arcus.Messaging.Tests.Core/ServiceBus/MessageHandlers/WriteOrderToDiskAzureServiceBusMessageHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public async Task ProcessMessageAsync(
3030
MessageCorrelationInfo correlationInfo,
3131
CancellationToken cancellationToken)
3232
{
33-
_logger.LogTrace("Write order v1 message to disk: {MessageId}", message.Id);
33+
_logger.LogTrace("[Test] Write order v1 message to disk: {MessageId}", message.Id);
3434

3535
string fileName = message.Id + ".json";
3636
string dirPath = Directory.GetCurrentDirectory();

0 commit comments

Comments
 (0)