Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,18 @@ private static Func<object, MessageHandlerSummary, bool> DetermineMessageBodyFil

if (rawMessage is not TMessage message)
{
summary.AddFailed("custom body filter failed", ("requires type", typeof(TMessage).Name));
summary.AddFailed("custom body filter failed", check => check.AddMember("requires type", typeof(TMessage).Name));
return false;
}

bool matches = messageBodyFilter(message);
if (matches)
{
summary.AddPassed("custom body filter passed", ("against type", typeof(TMessage).Name));
summary.AddPassed("custom body filter passed", check => check.AddMember("against type", typeof(TMessage).Name));
return true;
}

summary.AddFailed("custom body filter failed", reason: "returns 'false'");
summary.AddFailed("custom body filter failed", check => check.AddReason("returns 'false'"));
return false;
};
}
Expand All @@ -180,7 +180,7 @@ private static Func<MessageContext, MessageHandlerSummary, bool> DetermineMessag

if (jobId is not null && rawContext.JobId != jobId)
{
summary.AddFailed("custom context filter failed", ("requires job ID", rawContext.JobId));
summary.AddFailed("custom context filter failed", check => check.AddMember("requires job ID", rawContext.JobId));
return false;
}

Expand All @@ -191,18 +191,18 @@ private static Func<MessageContext, MessageHandlerSummary, bool> DetermineMessag

if (rawContext is not TMessageContext messageContext)
{
summary.AddFailed("custom context filter filter failed", ("requires type", typeof(TMessageContext).Name));
summary.AddFailed("custom context filter failed", check => check.AddMember("requires type", typeof(TMessageContext).Name));
return false;
}

bool matches = messageContextFilter(messageContext);
if (matches)
{
summary.AddPassed("custom context filter passed", ("against type", typeof(TMessageContext).Name));
summary.AddPassed("custom context filter passed", check => check.AddMember("against type", typeof(TMessageContext).Name));
return true;
}

summary.AddFailed("custom context filter failed", reason: "returns 'false'");
summary.AddFailed("custom context filter failed", check => check.AddReason("returns 'false'"));
return false;
};
}
Expand Down Expand Up @@ -283,19 +283,34 @@ internal async Task<MessageResult> TryCustomDeserializeMessageAsync(string messa
return MessageResult.Failure("n/a");
}

Task<MessageResult> deserializeMessageAsync = _messageBodySerializer.DeserializeMessageAsync(message);
Type serializerType = _messageBodySerializer.GetType();

if (deserializeMessageAsync is null)
MessageResult result = null;
try
{
summary.AddFailed("custom body parsing failed", ("using type", serializerType.Name), "returns 'null'");
return MessageResult.Failure("n/a");
}
Task<MessageResult> deserializeMessageAsync = _messageBodySerializer.DeserializeMessageAsync(message);

MessageResult result = await deserializeMessageAsync;
if (result is null)
if (deserializeMessageAsync is null)
{
summary.AddFailed("custom body parsing failed",
check => check.AddMember("using type", serializerType.Name)
.AddReason("returns 'null'"));

return MessageResult.Failure("n/a");
}

result = await deserializeMessageAsync;
if (result is null)
{
summary.AddFailed("custom body parsing failed",
check => check.AddMember("using type", serializerType.Name)
.AddReason("returns 'null'"));

return MessageResult.Failure("n/a");
}
}
catch (Exception exception)
{
summary.AddFailed("custom body parsing failed", ("using type", serializerType.Name), "returns 'null'");
summary.AddFailed(exception, "custom body parsing failed", check => check.AddMember("using type", serializerType.Name));
return MessageResult.Failure("n/a");
}

Expand All @@ -304,15 +319,18 @@ internal async Task<MessageResult> TryCustomDeserializeMessageAsync(string messa
Type deserializedMessageType = result.DeserializedMessage.GetType();
if (deserializedMessageType == MessageType || deserializedMessageType.IsSubclassOf(MessageType))
{
summary.AddPassed("custom body parsing passed", ("using type", serializerType.Name));
summary.AddPassed("custom body parsing passed", check => check.AddMember("using type", serializerType.Name));
return result;
}

summary.AddFailed("custom body parsing failed", ("using type", serializerType.Name), ("requires type", MessageType.Name), ("but got type", deserializedMessageType.Name));
summary.AddFailed("custom body parsing failed",
check => check.AddMember("using deserializer type", serializerType.Name)
.AddReason($"requires message type={MessageType.Name}, got type={deserializedMessageType.Name}"));

return MessageResult.Failure("n/a");
}

summary.AddFailed(result.Exception, "custom body parsing failed", result.ErrorMessage);
summary.AddFailed(result.Exception, "custom body parsing failed", check => check.AddReason(result.ErrorMessage));
return MessageResult.Failure("n/a");
}

Expand Down
180 changes: 97 additions & 83 deletions src/Arcus.Messaging.Abstractions/MessageHandling/MessageRouter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
Expand Down Expand Up @@ -107,34 +108,27 @@ protected async Task<MessageProcessingResult> RouteMessageThroughRegisteredHandl
return NoHandlersRegistered(messageContext.MessageId);
}

try
int skippedHandlers = 0;
bool hasGoneThroughMessageHandler = false;

foreach (var handler in handlers)
{
int skippedHandlers = 0;
bool hasGoneThroughMessageHandler = false;
MessageProcessingResult result = await ProcessMessageHandlerAsync(handler, messageBody, messageContext, correlationInfo, cancellation);
hasGoneThroughMessageHandler = result.IsSuccessful || result.Error is MessageProcessingError.ProcessingInterrupted;

foreach (var handler in handlers)
if (result.IsSuccessful)
{
MessageProcessingResult result = await ProcessMessageHandlerAsync(handler, messageBody, messageContext, correlationInfo, cancellation);
hasGoneThroughMessageHandler = result.IsSuccessful || result.Error is MessageProcessingError.ProcessingInterrupted;

if (result.IsSuccessful)
{
Logger.LogMessageProcessedSummary(messageContext.MessageId, handler.MessageHandlerType, skippedHandlers);
return result;
}

skippedHandlers++;
Logger.LogMessageProcessedSummary(messageContext.MessageId, handler.MessageHandlerType, skippedHandlers);
return result;
}

return hasGoneThroughMessageHandler
? MatchedHandlerFailed(messageContext.MessageId)
: NoMatchedHandler(messageContext.MessageId);
}
catch (Exception exception)
{
return ExceptionDuringRouting(exception, messageContext.MessageId);
skippedHandlers++;
}

return hasGoneThroughMessageHandler
? MatchedHandlerFailed(messageContext.MessageId)
: NoMatchedHandler(messageContext.MessageId);

MessageProcessingResult NoHandlersRegistered(string messageId)
{
Logger.LogNoHandlersRegistered(messageId);
Expand All @@ -152,12 +146,6 @@ MessageProcessingResult MatchedHandlerFailed(string messageId)
Logger.LogMatchedHandlerFailedToProcessMessage(messageId);
return MessageProcessingResult.Failure(messageId, MessageProcessingError.MatchedHandlerFailed, MatchedHandlerFailedMessage);
}

MessageProcessingResult ExceptionDuringRouting(Exception exception, string messageId)
{
Logger.LogExceptionDuringRouting(exception, messageId);
return MessageProcessingResult.Failure(messageId, MessageProcessingError.ProcessingInterrupted, ExceptionDuringRoutingMessage, exception);
}
}

private async Task<MessageProcessingResult> ProcessMessageHandlerAsync<TMessageContext>(
Expand Down Expand Up @@ -206,7 +194,7 @@ MessageProcessingResult MatchedHandlerSkipped()

MessageProcessingResult MatchedHandlerFailed(Exception exception, string errorMessage)
{
summary.AddFailed(exception, "message processing failed", errorMessage);
summary.AddFailed(exception, "message processing failed", check => check.AddReason(errorMessage));

Logger.LogMessageFailedInHandler(messageType, context.MessageId, handler.MessageHandlerType, summary);
return MessageProcessingResult.Failure(context.MessageId, MessageProcessingError.ProcessingInterrupted, "n/a");
Expand Down Expand Up @@ -278,11 +266,11 @@ private async Task<MessageResult> DeserializeMessageAsync(MessageHandler handler
object deserializedByType = JsonSerializer.Deserialize(messageBody, handlerMessageType, _jsonOptions);
if (deserializedByType != null)
{
summary.AddPassed("default JSON body parsing passed", ("additional members", Options.Deserialization.AdditionalMembers));
summary.AddPassed("default JSON body parsing passed", check => check.AddMember("additional members", Options.Deserialization.AdditionalMembers.ToString()));
return MessageResult.Success(deserializedByType);
}

summary.AddFailed("default JSON body parsing failed", reason: "returns 'null'");
summary.AddFailed("default JSON body parsing failed", check => check.AddReason("returns 'null'"));
return MessageResult.Failure("n/a");
}
catch (JsonException exception)
Expand Down Expand Up @@ -349,9 +337,6 @@ internal static partial class MessageRouterLoggerExtensions
[LoggerMessage(LogLevel.Error, "Message '{MessageId}' [Failed in] message pump => ✗ " + MatchedHandlerFailedMessage)]
internal static partial void LogMatchedHandlerFailedToProcessMessage(this ILogger logger, string messageId);

[LoggerMessage(LogLevel.Critical, "Message '{MessageId}' [Failed in] message pump => ✗ " + ExceptionDuringRoutingMessage)]
internal static partial void LogExceptionDuringRouting(this ILogger logger, Exception exception, string messageId);

internal static void LogMessageSkippedByHandler(this ILogger logger, string messageId, Type messageHandlerType, MessageHandlerSummary summary)
{
LogMessageSkippedByHandler(logger, summary.OccurredException, messageId, messageHandlerType.Name, summary);
Expand Down Expand Up @@ -411,78 +396,107 @@ internal class MessageHandlerSummary
_ => new AggregateException(_exceptions)
};

/// <summary>
/// Adds a passed check to the summary.
/// </summary>
/// <param name="description">The message that describes in a short manner the check that was passed.</param>
/// <param name="members">The additional key-value pair members that were involved in the check.</param>
internal void AddPassed(string description, params (string, object)[] members)
internal class MessageHandlerCheckBuilder
{
ArgumentNullException.ThrowIfNull(description);
private readonly Collection<(string memberName, string memberValue)> _members = [];
private string _reason;
private readonly StringBuilder _result = new();

string membersDescription = members.Length > 0
? $" ({string.Join(", ", members.Select(m => m.Item1 + "=" + m.Item2))})"
: string.Empty;
private MessageHandlerCheckBuilder(string description)
{
ArgumentException.ThrowIfNullOrWhiteSpace(description);
_result.Append(description);
}

_lines.Add("✓ " + description + membersDescription);
}
internal static MessageHandlerCheckBuilder Passed(string description) => new("✓ " + description);
internal static MessageHandlerCheckBuilder Failed(string description) => new("✗ " + description);

/// <summary>
/// Adds a failed check to the summary.
/// </summary>
/// <param name="exception">The optional exception that occured during the check.</param>
/// <param name="description">The message that describes in a short manner the check that was failed.</param>
/// <param name="reason">The message that explains the reason why the check failed.</param>
internal void AddFailed(Exception exception, string description, string reason = "exception thrown")
{
if (exception is not null)
/// <summary>
/// Adds a key-value pair member to the pre-check message - acts as additional context (i.e. 'using type=MyType').
/// </summary>
internal MessageHandlerCheckBuilder AddMember(string memberName, string memberValue)
{
_exceptions.Add(exception);
_members.Add((memberName, memberValue));
return this;
}

AddFailed(description, reason);
/// <summary>
/// Adds a final reason why the pre-check acted like it did.
/// </summary>
internal MessageHandlerCheckBuilder AddReason(string reason)
{
_reason = reason;
return this;
}

/// <summary>
/// Returns a string that represents the current object.
/// </summary>
/// <returns>A string that represents the current object.</returns>
public override string ToString()
{
if (_members.Count > 0)
{
_result.Append(" (");
_result.AppendJoin(", ", _members.Select(m => $"{m.memberName}={m.memberValue}"));
_result.Append(')');
}

if (_reason != null)
{
_result.Append(": ");
_result.Append(_reason);
}

return _result.ToString();
}
}

/// <summary>
/// Adds a failed check to the summary.
/// Adds a passed pre-check line to the summary.
/// </summary>
/// <param name="description">The message that describes in a short manner the check that was failed.</param>
/// <param name="reason">The message that explains the reason why the check failed.</param>
internal void AddFailed(string description, string reason)
/// <param name="description">The short description of the pre-check.</param>
/// <param name="configureCheck">The additional information around the pre-check, formatted on the same line.</param>
internal void AddPassed(string description, Action<MessageHandlerCheckBuilder> configureCheck = null)
{
ArgumentNullException.ThrowIfNull(description);
ArgumentNullException.ThrowIfNull(reason);
var builder = MessageHandlerCheckBuilder.Passed(description);
configureCheck?.Invoke(builder);

AddFailed(description + " (" + reason + ")");
_lines.Add(builder.ToString());
}

/// <summary>
/// Adds a failed check to the summary.
/// Adds a failed pre-check line to the summary.
/// </summary>
/// <param name="description">The message that describes in a short manner the check that was failed.</param>
/// <param name="members">The additional members that were involved in the check (both single values as tuples are supported).</param>
internal void AddFailed(string description, params object[] members)
/// <param name="exception">
/// The occurred exception that caused the pre-check to fail
/// - only here to track exceptions (<see cref="OccurredException"/>), not to expose information in the logged line.
/// </param>
/// <param name="description">The short description of the pre-check.</param>
/// <param name="configureCheck">The additional information around the pre-check, formatted on the same line.</param>
internal void AddFailed(Exception exception, string description, Action<MessageHandlerCheckBuilder> configureCheck = null)
{
ArgumentNullException.ThrowIfNull(description);
ArgumentNullException.ThrowIfNull(members);
ArgumentNullException.ThrowIfNull(exception);
_exceptions.Add(exception);

string membersDescription = members.Length > 1
? $" ({string.Join(", ", members.Select(m =>
{
return m switch
{
(string name, string value) => $"{name}={value}",
_ => m.ToString()
};
}))}"
: string.Empty;

AddFailed(description + membersDescription);
AddFailed(description, check =>
{
check.AddReason("exception thrown");
configureCheck?.Invoke(check);
});
}

private void AddFailed(string description)
/// <summary>
/// Adds a failed pre-check line to the summary.
/// </summary>
/// <param name="description">The short description of the pre-check.</param>
/// <param name="configureCheck">The additional information around the pre-check, formatted on the same line.</param>
internal void AddFailed(string description, Action<MessageHandlerCheckBuilder> configureCheck = null)
{
_lines.Add("✗ " + description);
var builder = MessageHandlerCheckBuilder.Failed(description);
configureCheck?.Invoke(builder);

_lines.Add(builder.ToString());
}

/// <summary>
Expand Down