|
1 | 1 | using System;
|
2 |
| -using System.Threading; |
3 |
| -using System.Threading.Tasks; |
4 |
| -using Arcus.Messaging.Abstractions; |
5 | 2 | using Arcus.Messaging.Pumps.ServiceBus;
|
| 3 | +using Arcus.Messaging.Pumps.ServiceBus.Configuration; |
6 | 4 | using CloudNative.CloudEvents;
|
7 |
| -using GuardNet; |
8 |
| -using Microsoft.Azure.ServiceBus; |
9 |
| -using Microsoft.Azure.ServiceBus.Management; |
10 | 5 | using Microsoft.Extensions.Configuration;
|
11 |
| -using Microsoft.Extensions.DependencyInjection; |
12 | 6 | using Microsoft.Extensions.Logging;
|
13 |
| -using Microsoft.Extensions.Options; |
14 | 7 |
|
15 | 8 | namespace Arcus.BackgroundJobs.CloudEvents
|
16 | 9 | {
|
17 | 10 | /// <summary>
|
18 | 11 | /// Representing a Azure Service Bus Topic message pump that will create and delete a Service Bus Topic subscription during the lifetime of the pump.
|
19 | 12 | /// </summary>
|
20 |
| - public abstract class CloudEventBackgroundJob : AzureServiceBusMessagePump<CloudNative.CloudEvents.CloudEvent> |
| 13 | + public class CloudEventBackgroundJob : AzureServiceBusMessagePump |
21 | 14 | {
|
22 | 15 | private static readonly JsonEventFormatter JsonEventFormatter = new JsonEventFormatter();
|
23 | 16 |
|
24 | 17 | /// <summary>
|
25 | 18 | /// Initializes a new instance of the <see cref="CloudEventBackgroundJob"/> class.
|
26 | 19 | /// </summary>
|
| 20 | + /// <param name="settings">The settings to influence the behavior of the message pump.</param> |
27 | 21 | /// <param name="configuration">The configuration of the application.</param>
|
28 | 22 | /// <param name="serviceProvider">The collection of services that are configured.</param>
|
29 |
| - /// <param name="options">The options to further configure this job.</param> |
30 | 23 | /// <param name="logger">The logger to write telemetry to.</param>
|
31 | 24 | /// <exception cref="ArgumentNullException">When the <paramref name="serviceProvider"/> is <c>null</c>.</exception>
|
32 | 25 | /// <exception cref="ArgumentException">When the <paramref name="serviceProvider"/> doesn't have a registered <see cref="AzureServiceBusMessagePumpSettings"/> instance.</exception>
|
33 |
| - protected CloudEventBackgroundJob( |
| 26 | + public CloudEventBackgroundJob( |
| 27 | + AzureServiceBusMessagePumpSettings settings, |
34 | 28 | IConfiguration configuration,
|
35 | 29 | IServiceProvider serviceProvider,
|
36 |
| - IOptions<CloudEventBackgroundJobOptions> options, |
37 |
| - ILogger logger) : base(configuration, serviceProvider, logger) |
| 30 | + ILogger<AzureServiceBusMessagePump> logger) |
| 31 | + : base(settings, configuration, serviceProvider, logger) |
38 | 32 | {
|
39 |
| - Guard.NotNull( |
40 |
| - serviceProvider, |
41 |
| - nameof(serviceProvider), |
42 |
| - $"Requires a '{nameof(IServiceProvider)}' implementation to retrieve the '{nameof(AzureServiceBusMessagePumpSettings)}'"); |
43 |
| - Guard.NotNull(options, nameof(options), $"Requires a '{nameof(IOptions<CloudEventBackgroundJobOptions>)}' to correctly configure this job"); |
44 |
| - Guard.For<ArgumentException>(() => options.Value is null, $"Requires a '{nameof(IOptions<CloudEventBackgroundJobOptions>)}' to correctly configure this job"); |
45 |
| - |
46 |
| - var messagePumpSettings = serviceProvider.GetRequiredService<AzureServiceBusMessagePumpSettings>(); |
47 |
| - Guard.NotNull<AzureServiceBusMessagePumpSettings, ArgumentException>( |
48 |
| - messagePumpSettings, |
49 |
| - $"The '{nameof(serviceProvider)}:{serviceProvider.GetType().Name}' requires to have a non-null '{nameof(AzureServiceBusMessagePumpSettings)}' instance registered"); |
50 |
| - |
51 |
| - JobId = options.Value.JobId; |
52 | 33 | }
|
53 | 34 |
|
54 | 35 | /// <summary>
|
55 |
| - /// Gets the unique identifier for this background job to distinguish this job instance in a multi-instance deployment. |
56 |
| - /// </summary> |
57 |
| - public string JobId { get; } |
58 |
| - |
59 |
| - /// <summary> |
60 |
| - /// Deserializes a raw JSON message body. |
| 36 | + /// Tries to parse the given raw <paramref name="message" /> to the contract of the <see cref="T:Arcus.Messaging.Pumps.Abstractions.MessageHandling.IMessageHandler`2" />. |
61 | 37 | /// </summary>
|
62 |
| - /// <param name="rawMessageBody">Raw message body to deserialize</param> |
63 |
| - /// <param name="messageContext">Context concerning the message</param> |
64 |
| - /// <returns>Deserialized message</returns> |
65 |
| - protected override CloudNative.CloudEvents.CloudEvent DeserializeJsonMessageBody(byte[] rawMessageBody, MessageContext messageContext) |
| 38 | + /// <param name="message">The raw incoming message that will be tried to parse against the <see cref="T:Arcus.Messaging.Pumps.Abstractions.MessageHandling.IMessageHandler`2" />'s message contract.</param> |
| 39 | + /// <param name="messageType">The type of the message that the message handler can process.</param> |
| 40 | + /// <param name="result">The resulted parsed message when the <paramref name="message" /> conforms with the message handlers' contract.</param> |
| 41 | + /// <returns> |
| 42 | + /// [true] if the <paramref name="message" /> conforms the <see cref="T:Arcus.Messaging.Pumps.Abstractions.MessageHandling.IMessageHandler`2" />'s contract; otherwise [false]. |
| 43 | + /// </returns> |
| 44 | + public override bool TryDeserializeToMessageFormat(string message, Type messageType, out object result) |
66 | 45 | {
|
67 |
| - Guard.NotNull(rawMessageBody, nameof(rawMessageBody), "Cannot deserialize raw JSON body from 'null' input"); |
68 |
| - Guard.NotAny(rawMessageBody, nameof(rawMessageBody), "Cannot deserialize raw JSON body from empty input"); |
69 |
| - |
70 |
| - CloudNative.CloudEvents.CloudEvent cloudEvent = JsonEventFormatter.DecodeStructuredEvent(rawMessageBody); |
71 |
| - return cloudEvent; |
72 |
| - } |
73 |
| - |
74 |
| - /// <inheritdoc /> |
75 |
| - protected abstract override Task ProcessMessageAsync( |
76 |
| - CloudNative.CloudEvents.CloudEvent message, |
77 |
| - AzureServiceBusMessageContext messageContext, |
78 |
| - MessageCorrelationInfo correlationInfo, |
79 |
| - CancellationToken cancellationToken); |
80 |
| - |
81 |
| - /// <summary> |
82 |
| - /// Triggered when the application host is ready to start the service. |
83 |
| - /// </summary> |
84 |
| - /// <param name="cancellationToken">Indicates that the start process has been aborted.</param> |
85 |
| - public override async Task StartAsync(CancellationToken cancellationToken) |
86 |
| - { |
87 |
| - ServiceBusConnectionStringBuilder serviceBusConnectionString = await GetServiceBusConnectionStringAsync(); |
88 |
| - |
89 |
| - Logger.LogTrace("[Job: {JobId}] Creating subscription '{SubscriptionName}' on topic '{TopicPath}'...", JobId, Settings.SubscriptionName, serviceBusConnectionString.EntityPath); |
90 |
| - var subscriptionDescription = new SubscriptionDescription(serviceBusConnectionString.EntityPath, Settings.SubscriptionName) |
| 46 | + try |
91 | 47 | {
|
92 |
| - AutoDeleteOnIdle = TimeSpan.FromHours(1), |
93 |
| - MaxDeliveryCount = 3, |
94 |
| - UserMetadata = $"Subscription created by Arcus job: '{JobId}' to process inbound CloudEvents." |
95 |
| - }; |
96 |
| - |
97 |
| - var ruleDescription = new RuleDescription("Accept-All", new TrueFilter()); |
98 |
| - |
99 |
| - var serviceBusClient = new ManagementClient(serviceBusConnectionString); |
100 |
| - await serviceBusClient.CreateSubscriptionAsync(subscriptionDescription, ruleDescription, cancellationToken) |
101 |
| - .ConfigureAwait(continueOnCapturedContext: false); |
102 |
| - |
103 |
| - Logger.LogTrace("[Job: {JobId}] Subscription '{SubscriptionName}' created on topic '{TopicPath}'", JobId, Settings.SubscriptionName, serviceBusConnectionString.EntityPath); |
104 |
| - await serviceBusClient.CloseAsync().ConfigureAwait(continueOnCapturedContext: false); |
105 |
| - |
106 |
| - await base.StartAsync(cancellationToken); |
107 |
| - } |
108 |
| - |
109 |
| - /// <summary> |
110 |
| - /// Triggered when the application host is performing a graceful shutdown. |
111 |
| - /// </summary> |
112 |
| - /// <param name="cancellationToken">Indicates that the shutdown process should no longer be graceful.</param> |
113 |
| - public override async Task StopAsync(CancellationToken cancellationToken) |
114 |
| - { |
115 |
| - ServiceBusConnectionStringBuilder serviceBusConnectionString = await GetServiceBusConnectionStringAsync(); |
116 |
| - |
117 |
| - Logger.LogTrace("[Job: {JobId}] Deleting subscription '{SubscriptionName}' on topic '{TopicPath}'...", JobId, Settings.SubscriptionName, serviceBusConnectionString.EntityPath); |
118 |
| - var serviceBusClient = new ManagementClient(serviceBusConnectionString); |
119 |
| - await serviceBusClient.DeleteSubscriptionAsync(serviceBusConnectionString.EntityPath, Settings.SubscriptionName, cancellationToken); |
120 |
| - Logger.LogTrace("[Job: {JobId}] Subscription '{SubscriptionName}' deleted on topic '{TopicPath}'", JobId, Settings.SubscriptionName, serviceBusConnectionString.EntityPath); |
121 |
| - await serviceBusClient.CloseAsync().ConfigureAwait(continueOnCapturedContext: false); |
122 |
| - |
123 |
| - await base.StopAsync(cancellationToken); |
124 |
| - } |
125 |
| - |
126 |
| - private async Task<ServiceBusConnectionStringBuilder> GetServiceBusConnectionStringAsync() |
127 |
| - { |
128 |
| - Logger.LogTrace("[Job: {JobId}] Getting ServiceBus Topic connection string on topic '{TopicPath}'...", JobId, Settings.EntityName); |
129 |
| - string connectionString = await Settings.GetConnectionStringAsync(); |
130 |
| - var serviceBusConnectionBuilder = new ServiceBusConnectionStringBuilder(connectionString); |
131 |
| - Logger.LogTrace("[JobId: {JobId}] Got ServiceBus Topic connection string on topic '{TopicPath}'", JobId, Settings.EntityName); |
| 48 | + if (messageType == typeof(CloudEvent)) |
| 49 | + { |
| 50 | + CloudEvent cloudEvent = JsonEventFormatter.DecodeStructuredEvent(DefaultEncoding.GetBytes(message)); |
| 51 | + |
| 52 | + result = cloudEvent; |
| 53 | + return true; |
| 54 | + } |
| 55 | + } |
| 56 | + catch (Exception exception) |
| 57 | + { |
| 58 | + Logger.LogWarning(exception, "Unable to deserialize the CloudEvent"); |
| 59 | + } |
132 | 60 |
|
133 |
| - return serviceBusConnectionBuilder; |
| 61 | + result = null; |
| 62 | + return false; |
134 | 63 | }
|
135 | 64 | }
|
136 | 65 | }
|
0 commit comments