Skip to content
This repository was archived by the owner on Sep 10, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
*.user
*.userosscache
*.sln.docstates
*.local.json

# User-specific files (MonoDevelop/Xamarin Studio)
*.userprefs
Expand Down
6 changes: 1 addition & 5 deletions build/ci-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ variables:
- group: 'Arcus Security - Integration Testing'
- group: 'Arcus - GitHub Package Registry'
- group: 'Build Configuration'
# Always use fixed version for .NET Core SDK
- name: 'DotNet.Sdk.Version'
value: '3.0.101'
- name: 'Project'
value: 'Arcus.BackgroundJobs'
- template: ./variables/build.yml
# 'Package.Version.ManualTrigger' is added as queue-time variable on build in Azure DevOps

stages:
Expand Down
3 changes: 1 addition & 2 deletions build/nuget-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ variables:
- group: 'Arcus Security - Integration Testing'
- group: 'Build Configuration'
- group: 'Arcus Background Jobs - .NET'
- name: 'Project'
value: 'Arcus.BackgroundJobs'
- template: ./variables/build.yml
- name: 'GitHub.Repository'
value: 'arcus-azure/arcus.backgroundjobs'
# 'Package.Version' is added as queue-time variable on build in Azure DevOps
Expand Down
3 changes: 3 additions & 0 deletions build/variables/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
variables:
DotNet.Sdk.Version: '3.0.101'
Project: 'Arcus.BackgroundJobs'
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Arcus.EventGrid" Version="3.0.0-preview-1" />
<PackageReference Include="Arcus.Messaging.Pumps.ServiceBus" Version="0.1.0-preview-2" />
<PackageReference Include="Microsoft.Azure.EventGrid" Version="3.2.0" />
<PackageReference Include="Arcus.EventGrid" Version="3.0.0" />
<PackageReference Include="Arcus.Messaging.Pumps.ServiceBus" Version="0.1.0-preview-3" />
</ItemGroup>

</Project>
131 changes: 30 additions & 101 deletions src/Arcus.BackgroundJobs.CloudEvents/CloudEventBackgroundJob.cs
Original file line number Diff line number Diff line change
@@ -1,136 +1,65 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Arcus.Messaging.Abstractions;
using Arcus.Messaging.Pumps.ServiceBus;
using Arcus.Messaging.Pumps.ServiceBus.Configuration;
using CloudNative.CloudEvents;
using GuardNet;
using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.Management;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Arcus.BackgroundJobs.CloudEvents
{
/// <summary>
/// Representing a Azure Service Bus Topic message pump that will create and delete a Service Bus Topic subscription during the lifetime of the pump.
/// </summary>
public abstract class CloudEventBackgroundJob : AzureServiceBusMessagePump<CloudNative.CloudEvents.CloudEvent>
public class CloudEventBackgroundJob : AzureServiceBusMessagePump
{
private static readonly JsonEventFormatter JsonEventFormatter = new JsonEventFormatter();

/// <summary>
/// Initializes a new instance of the <see cref="CloudEventBackgroundJob"/> class.
/// </summary>
/// <param name="settings">The settings to influence the behavior of the message pump.</param>
/// <param name="configuration">The configuration of the application.</param>
/// <param name="serviceProvider">The collection of services that are configured.</param>
/// <param name="options">The options to further configure this job.</param>
/// <param name="logger">The logger to write telemetry to.</param>
/// <exception cref="ArgumentNullException">When the <paramref name="serviceProvider"/> is <c>null</c>.</exception>
/// <exception cref="ArgumentException">When the <paramref name="serviceProvider"/> doesn't have a registered <see cref="AzureServiceBusMessagePumpSettings"/> instance.</exception>
protected CloudEventBackgroundJob(
public CloudEventBackgroundJob(
AzureServiceBusMessagePumpSettings settings,
IConfiguration configuration,
IServiceProvider serviceProvider,
IOptions<CloudEventBackgroundJobOptions> options,
ILogger logger) : base(configuration, serviceProvider, logger)
ILogger<AzureServiceBusMessagePump> logger)
: base(settings, configuration, serviceProvider, logger)
{
Guard.NotNull(
serviceProvider,
nameof(serviceProvider),
$"Requires a '{nameof(IServiceProvider)}' implementation to retrieve the '{nameof(AzureServiceBusMessagePumpSettings)}'");
Guard.NotNull(options, nameof(options), $"Requires a '{nameof(IOptions<CloudEventBackgroundJobOptions>)}' to correctly configure this job");
Guard.For<ArgumentException>(() => options.Value is null, $"Requires a '{nameof(IOptions<CloudEventBackgroundJobOptions>)}' to correctly configure this job");

var messagePumpSettings = serviceProvider.GetRequiredService<AzureServiceBusMessagePumpSettings>();
Guard.NotNull<AzureServiceBusMessagePumpSettings, ArgumentException>(
messagePumpSettings,
$"The '{nameof(serviceProvider)}:{serviceProvider.GetType().Name}' requires to have a non-null '{nameof(AzureServiceBusMessagePumpSettings)}' instance registered");

JobId = options.Value.JobId;
}

/// <summary>
/// Gets the unique identifier for this background job to distinguish this job instance in a multi-instance deployment.
/// </summary>
public string JobId { get; }

/// <summary>
/// Deserializes a raw JSON message body.
/// Tries to parse the given raw <paramref name="message" /> to the contract of the <see cref="T:Arcus.Messaging.Pumps.Abstractions.MessageHandling.IMessageHandler`2" />.
/// </summary>
/// <param name="rawMessageBody">Raw message body to deserialize</param>
/// <param name="messageContext">Context concerning the message</param>
/// <returns>Deserialized message</returns>
protected override CloudNative.CloudEvents.CloudEvent DeserializeJsonMessageBody(byte[] rawMessageBody, MessageContext messageContext)
/// <param name="message">The raw incoming message that will be tried to parse against the <see cref="T:Arcus.Messaging.Pumps.Abstractions.MessageHandling.IMessageHandler`2" />'s message contract.</param>
/// <param name="messageType">The type of the message that the message handler can process.</param>
/// <param name="result">The resulted parsed message when the <paramref name="message" /> conforms with the message handlers' contract.</param>
/// <returns>
/// [true] if the <paramref name="message" /> conforms the <see cref="T:Arcus.Messaging.Pumps.Abstractions.MessageHandling.IMessageHandler`2" />'s contract; otherwise [false].
/// </returns>
public override bool TryDeserializeToMessageFormat(string message, Type messageType, out object result)
{
Guard.NotNull(rawMessageBody, nameof(rawMessageBody), "Cannot deserialize raw JSON body from 'null' input");
Guard.NotAny(rawMessageBody, nameof(rawMessageBody), "Cannot deserialize raw JSON body from empty input");

CloudNative.CloudEvents.CloudEvent cloudEvent = JsonEventFormatter.DecodeStructuredEvent(rawMessageBody);
return cloudEvent;
}

/// <inheritdoc />
protected abstract override Task ProcessMessageAsync(
CloudNative.CloudEvents.CloudEvent message,
AzureServiceBusMessageContext messageContext,
MessageCorrelationInfo correlationInfo,
CancellationToken cancellationToken);

/// <summary>
/// Triggered when the application host is ready to start the service.
/// </summary>
/// <param name="cancellationToken">Indicates that the start process has been aborted.</param>
public override async Task StartAsync(CancellationToken cancellationToken)
{
ServiceBusConnectionStringBuilder serviceBusConnectionString = await GetServiceBusConnectionStringAsync();

Logger.LogTrace("[Job: {JobId}] Creating subscription '{SubscriptionName}' on topic '{TopicPath}'...", JobId, Settings.SubscriptionName, serviceBusConnectionString.EntityPath);
var subscriptionDescription = new SubscriptionDescription(serviceBusConnectionString.EntityPath, Settings.SubscriptionName)
try
{
AutoDeleteOnIdle = TimeSpan.FromHours(1),
MaxDeliveryCount = 3,
UserMetadata = $"Subscription created by Arcus job: '{JobId}' to process inbound CloudEvents."
};

var ruleDescription = new RuleDescription("Accept-All", new TrueFilter());

var serviceBusClient = new ManagementClient(serviceBusConnectionString);
await serviceBusClient.CreateSubscriptionAsync(subscriptionDescription, ruleDescription, cancellationToken)
.ConfigureAwait(continueOnCapturedContext: false);

Logger.LogTrace("[Job: {JobId}] Subscription '{SubscriptionName}' created on topic '{TopicPath}'", JobId, Settings.SubscriptionName, serviceBusConnectionString.EntityPath);
await serviceBusClient.CloseAsync().ConfigureAwait(continueOnCapturedContext: false);

await base.StartAsync(cancellationToken);
}

/// <summary>
/// Triggered when the application host is performing a graceful shutdown.
/// </summary>
/// <param name="cancellationToken">Indicates that the shutdown process should no longer be graceful.</param>
public override async Task StopAsync(CancellationToken cancellationToken)
{
ServiceBusConnectionStringBuilder serviceBusConnectionString = await GetServiceBusConnectionStringAsync();

Logger.LogTrace("[Job: {JobId}] Deleting subscription '{SubscriptionName}' on topic '{TopicPath}'...", JobId, Settings.SubscriptionName, serviceBusConnectionString.EntityPath);
var serviceBusClient = new ManagementClient(serviceBusConnectionString);
await serviceBusClient.DeleteSubscriptionAsync(serviceBusConnectionString.EntityPath, Settings.SubscriptionName, cancellationToken);
Logger.LogTrace("[Job: {JobId}] Subscription '{SubscriptionName}' deleted on topic '{TopicPath}'", JobId, Settings.SubscriptionName, serviceBusConnectionString.EntityPath);
await serviceBusClient.CloseAsync().ConfigureAwait(continueOnCapturedContext: false);

await base.StopAsync(cancellationToken);
}

private async Task<ServiceBusConnectionStringBuilder> GetServiceBusConnectionStringAsync()
{
Logger.LogTrace("[Job: {JobId}] Getting ServiceBus Topic connection string on topic '{TopicPath}'...", JobId, Settings.EntityName);
string connectionString = await Settings.GetConnectionStringAsync();
var serviceBusConnectionBuilder = new ServiceBusConnectionStringBuilder(connectionString);
Logger.LogTrace("[JobId: {JobId}] Got ServiceBus Topic connection string on topic '{TopicPath}'", JobId, Settings.EntityName);
if (messageType == typeof(CloudEvent))
{
CloudEvent cloudEvent = JsonEventFormatter.DecodeStructuredEvent(DefaultEncoding.GetBytes(message));

result = cloudEvent;
return true;
}
}
catch (Exception exception)
{
Logger.LogWarning(exception, "Unable to deserialize the CloudEvent");
}

return serviceBusConnectionBuilder;
result = null;
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using System;
using Arcus.BackgroundJobs.CloudEvents;
using Arcus.Messaging.Pumps.ServiceBus;
using Arcus.Messaging.Pumps.ServiceBus.Configuration;
using CloudNative.CloudEvents;
using GuardNet;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;

// ReSharper disable once CheckNamespace
namespace Microsoft.Extensions.DependencyInjection
{
public static class IServiceCollectionExtensions
{
/// <summary>
/// Adds a background job to the <see cref="IServiceCollection"/> to receive <see cref="CloudEvent"/>'s.
/// </summary>
/// <param name="services">The services collection to add the job to.</param>
/// <param name="subscriptionNamePrefix">The name of the Azure Service Bus subscription that will be created to receive <see cref="CloudEvent"/>'s.</param>
/// <param name="serviceBusTopicConnectionStringSecretKey">The configuration key that points to the Azure Service Bus Topic connection string.</param>
public static IServiceCollection AddCloudEventBackgroundJob(
this IServiceCollection services,
string subscriptionNamePrefix,
string serviceBusTopicConnectionStringSecretKey)
{
Guard.NotNull(services, nameof(services));
Guard.NotNullOrWhitespace(subscriptionNamePrefix, nameof(subscriptionNamePrefix), "Requires a non-blank subscription name of the Azure Service Bus Topic subscription, to receive Key Vault events");
Guard.NotNullOrWhitespace(serviceBusTopicConnectionStringSecretKey, nameof(serviceBusTopicConnectionStringSecretKey), "Requires a non-blank configuration key that points to a Azure Service Bus Topic");

services.AddHostedService(serviceProvider =>
{
var settings = new AzureServiceBusMessagePumpSettings(
entityName: null,
subscriptionName: $"{subscriptionNamePrefix}.{Guid.NewGuid().ToString()}",
ServiceBusEntity.Topic,
getConnectionStringFromConfigurationFunc: null,
getConnectionStringFromSecretFunc: secretProvider => secretProvider.GetRawSecretAsync(serviceBusTopicConnectionStringSecretKey),
options: new AzureServiceBusMessagePumpConfiguration(AzureServiceBusTopicMessagePumpOptions.Default),
serviceProvider: serviceProvider);

var configuration = serviceProvider.GetRequiredService<IConfiguration>();
var logger = serviceProvider.GetRequiredService<ILogger<AzureServiceBusMessagePump>>();

return new CloudEventBackgroundJob(settings, configuration, serviceProvider, logger);
});

return services;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Arcus.EventGrid" Version="3.0.0-preview-1" />
<PackageReference Include="Arcus.EventGrid" Version="3.0.0" />
<PackageReference Include="Arcus.Security.Core" Version="1.1.0" />
<PackageReference Include="Arcus.Security.Providers.AzureKeyVault" Version="1.1.0" />
<PackageReference Include="Microsoft.Azure.EventGrid" Version="3.2.0" />
<PackageReference Include="Microsoft.Azure.KeyVault.Core" Version="3.0.4" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Arcus.BackgroundJobs.CloudEvents\Arcus.BackgroundJobs.CloudEvents.csproj" />
</ItemGroup>

</Project>

This file was deleted.

Loading