Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Logging;

namespace Arcus.Messaging.Abstractions.ServiceBus
Expand All @@ -19,6 +20,7 @@ public class AzureServiceBusMessageContext : MessageContext
/// <param name="properties">The contextual properties provided on the message provided by the message publisher.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="systemProperties"/> is <c>null</c>.</exception>
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
[Obsolete("Will be removed in v3.0, please use the factory method instead: " + nameof(AzureServiceBusMessageContext) + "." + nameof(Create))]
public AzureServiceBusMessageContext(
string messageId,
string jobId,
Expand All @@ -38,6 +40,7 @@ public AzureServiceBusMessageContext(
/// <param name="entityType">The type of the Azure Service Bus entity on which a message with the ID <paramref name="messageId"/> was received.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="systemProperties"/> is <c>null</c>.</exception>
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
[Obsolete("Will be removed in v3.0, please use the factory method instead: " + nameof(AzureServiceBusMessageContext) + "." + nameof(Create))]
public AzureServiceBusMessageContext(
string messageId,
string jobId,
Expand All @@ -57,6 +60,33 @@ public AzureServiceBusMessageContext(
EntityType = entityType;
}

private AzureServiceBusMessageContext(
string jobId,
ServiceBusEntityType entityType,
ServiceBusReceiver receiver,
ServiceBusReceivedMessage message)
: base(message.MessageId, jobId, message.ApplicationProperties.ToDictionary(item => item.Key, item => item.Value))
{
FullyQualifiedNamespace = receiver.FullyQualifiedNamespace;
EntityPath = receiver.EntityPath;
EntityType = entityType;
SystemProperties = AzureServiceBusSystemProperties.CreateFrom(message);
LockToken = message.LockToken;
DeliveryCount = message.DeliveryCount;
}

/// <summary>
/// Gets the fully qualified Azure Service bus namespace that the message pump is associated with.
/// This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.
/// </summary>
public string FullyQualifiedNamespace { get; }

/// <summary>
/// Gets the path of the Azure Service bus entity that the message pump is connected to,
/// specific to the Azure Service bus namespace that contains it.
/// </summary>
public string EntityPath { get; }

/// <summary>
/// Gets the type of the Azure Service Bus entity on which the message was received.
/// </summary>
Expand All @@ -77,5 +107,37 @@ public AzureServiceBusMessageContext(
/// </summary>
/// <remarks>This increases when a message is abandoned and re-delivered for processing</remarks>
public int DeliveryCount { get; }

/// <summary>
/// Creates a new instance of the <see cref="AzureServiceBusMessageContext"/> based on the current Azure Service bus situation.
/// </summary>
/// <param name="jobId">The unique ID to identity the Azure Service bus message pump that is responsible for pumping messages from the <paramref name="receiver"/>.</param>
/// <param name="entityType">The type of Azure Service bus entity that the <paramref name="receiver"/> receives from.</param>
/// <param name="receiver">The Azure Service bus receiver that is responsible for receiving the <paramref name="message"/>.</param>
/// <param name="message">The Azure Service bus message that is currently being processed.</param>
/// <exception cref="ArgumentNullException">Thrown when one of the parameters is <c>null</c>.</exception>
public static AzureServiceBusMessageContext Create(
string jobId,
ServiceBusEntityType entityType,
ServiceBusReceiver receiver,
ServiceBusReceivedMessage message)
{
if (string.IsNullOrWhiteSpace(jobId))
{
throw new ArgumentException("Requires a non-blank job ID to identity an Azure Service bus message pump", nameof(jobId));
}

if (receiver is null)
{
throw new ArgumentNullException(nameof(receiver));
}

if (message is null)
{
throw new ArgumentNullException(nameof(message));
}

return new AzureServiceBusMessageContext(jobId, entityType, receiver, message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public static class ServiceBusReceivedMessageExtensions
/// </summary>
/// <param name="message">The received Azure Service Bus message to extract the system properties from.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="message"/> is <c>null</c>.</exception>
[Obsolete("Will be removed in v3.0, please use the factory method instead: " + nameof(AzureServiceBusMessageContext) + "." + nameof(AzureServiceBusMessageContext.Create) + " to get the Arcus-created system properties")]
public static AzureServiceBusSystemProperties GetSystemProperties(this ServiceBusReceivedMessage message)
{
if (message is null)
Expand All @@ -32,6 +33,7 @@ public static AzureServiceBusSystemProperties GetSystemProperties(this ServiceBu
/// <param name="jobId">The unique ID to identify the current messaging job, pump or router that is handling the received <paramref name="message"/>.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="message"/> is <c>null</c>.</exception>
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
[Obsolete("Will be removed in v3.0, please use the factory method instead: " + nameof(AzureServiceBusMessageContext) + "." + nameof(AzureServiceBusMessageContext.Create))]
public static AzureServiceBusMessageContext GetMessageContext(this ServiceBusReceivedMessage message, string jobId)
{
if (message is null)
Expand All @@ -50,6 +52,7 @@ public static AzureServiceBusMessageContext GetMessageContext(this ServiceBusRec
/// <param name="entityType">The type of the Azure Service Bus entity on which a message was received.</param>
/// <exception cref="ArgumentNullException">Thrown when the <paramref name="message"/> is <c>null</c>.</exception>
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
[Obsolete("Will be removed in v3.0, please use the factory method instead: " + nameof(AzureServiceBusMessageContext) + "." + nameof(AzureServiceBusMessageContext.Create))]
public static AzureServiceBusMessageContext GetMessageContext(this ServiceBusReceivedMessage message, string jobId, ServiceBusEntityType entityType)
{
if (message is null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ private async Task<MessageProcessingResult> ProcessMessageAsync(ServiceBusReceiv
}

using MessageCorrelationResult correlationResult = DetermineMessageCorrelation(message);
AzureServiceBusMessageContext messageContext = message.GetMessageContext(JobId, Settings.ServiceBusEntity);
var messageContext = AzureServiceBusMessageContext.Create(JobId, Settings.ServiceBusEntity, _messageReceiver, message);

MessageProcessingResult routingResult = await _messageRouter.RouteMessageAsync(_messageReceiver, message, messageContext, correlationResult.CorrelationInfo, cancellationToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1139,13 +1139,11 @@ private static ServiceBusMessageHandlerCollection AddServiceBusMessagePump(
AzureServiceBusMessagePumpOptions options =
DetermineMessagePumpOptions(configureQueueMessagePump, configureTopicMessagePump);

#pragma warning disable CS0618 // Type or member is obsolete: message router will be initiated directly in v3.0.
ServiceBusMessageHandlerCollection collection = services.AddServiceBusMessageRouting(provider =>
{
var logger = provider.GetService<ILogger<AzureServiceBusMessageRouter>>();
return new AzureServiceBusMessageRouter(provider, options.Routing, logger);
});
#pragma warning restore CS0618 // Type or member is obsolete
collection.JobId = options.JobId;

services.TryAddSingleton<IMessagePumpLifetime, DefaultMessagePumpLifetime>();
Expand Down