Skip to content

Commit 8cbd9eb

Browse files
feat: change circuitbreaker approach (#453)
* processing spike * code cleanup * pr-sug: use message processing result io boolean * pr-sug: promote circuit breaker state enum to class * pr-fix: throw-if-null is not available in net-standard * pr-fix: correct usings in az service bus message pump * pr-sug: add message id context to the processing result * pr-fix: correct time-out for resiliency tests * pr-fix: remove useless dev-test * pr-fix: correct recieved message creation in unit tests * pr-fix: more stable post-assertion resilence * pr-fix: use back the message id for the message processing result * pr-sug: finishing touches on circuit breaker state transitioning * pr-fix: streamline equalization in circuit breaker state * pr-fix: let router abbandon message io circuit breaker handler * pr-sug: rename wait interval method + fix wait recovery period log * pr-fix: complete renaming in message pump * pr-sug: use transition method for open state * pr-sug: add half-open state boolean flag * pr-fix: limit processing of single message on queue * Update src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs Co-authored-by: Frederik Gheysels <[email protected]> * pr-sug: reframe summary and remarks wording in circuit breaker states * Update src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs Co-authored-by: Frederik Gheysels <[email protected]> * Update src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs Co-authored-by: Frederik Gheysels <[email protected]> * Update src/Arcus.Messaging.Pumps.Abstractions/Resiliency/IMessagePumpCircuitBreaker.cs --------- Co-authored-by: stijnmoreels <[email protected]>
1 parent 1c8eb1a commit 8cbd9eb

File tree

12 files changed

+454
-410
lines changed

12 files changed

+454
-410
lines changed

src/Arcus.Messaging.Abstractions.ServiceBus/MessageHandling/AzureServiceBusMessageRouter.cs

Lines changed: 80 additions & 61 deletions
Large diffs are not rendered by default.

src/Arcus.Messaging.Abstractions.ServiceBus/MessageHandling/IAzureServiceBusMessageRouter.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public interface IAzureServiceBusMessageRouter : IMessageRouter
1313
{
1414
/// <summary>
1515
/// Handle a new <paramref name="message"/> that was received by routing them through registered <see cref="IAzureServiceBusMessageHandler{TMessage}"/>s
16-
/// and optionally through an registered <see cref="IFallbackMessageHandler"/> or <see cref="IAzureServiceBusFallbackMessageHandler"/>
16+
/// and optionally through a registered <see cref="IFallbackMessageHandler"/> or <see cref="IAzureServiceBusFallbackMessageHandler"/>
1717
/// if none of the message handlers were able to process the <paramref name="message"/>.
1818
/// </summary>
1919
/// <param name="message">The incoming message that needs to be routed through registered message handlers.</param>
@@ -29,15 +29,15 @@ public interface IAzureServiceBusMessageRouter : IMessageRouter
2929
/// Thrown when the <paramref name="message"/>, <paramref name="messageContext"/>, or <paramref name="correlationInfo"/> is <c>null</c>.
3030
/// </exception>
3131
/// <exception cref="InvalidOperationException">Thrown when no message handlers or none matching message handlers are found to process the message.</exception>
32-
Task RouteMessageAsync(
32+
Task<MessageProcessingResult> RouteMessageAsync(
3333
ServiceBusReceivedMessage message,
3434
AzureServiceBusMessageContext messageContext,
3535
MessageCorrelationInfo correlationInfo,
3636
CancellationToken cancellationToken);
3737

3838
/// <summary>
3939
/// Handle a new <paramref name="message"/> that was received by routing them through registered <see cref="IAzureServiceBusMessageHandler{TMessage}"/>s
40-
/// and optionally through an registered <see cref="IFallbackMessageHandler"/> or <see cref="IAzureServiceBusFallbackMessageHandler"/>
40+
/// and optionally through a registered <see cref="IFallbackMessageHandler"/> or <see cref="IAzureServiceBusFallbackMessageHandler"/>
4141
/// if none of the message handlers were able to process the <paramref name="message"/>.
4242
/// </summary>
4343
/// <param name="messageReceiver">
@@ -52,7 +52,7 @@ Task RouteMessageAsync(
5252
/// Thrown when the <paramref name="messageReceiver"/>, <paramref name="message"/>, <paramref name="messageContext"/>, or <paramref name="correlationInfo"/> is <c>null</c>.
5353
/// </exception>
5454
/// <exception cref="InvalidOperationException">Thrown when no message handlers or none matching message handlers are found to process the message.</exception>
55-
Task RouteMessageAsync(
55+
Task<MessageProcessingResult> RouteMessageAsync(
5656
ServiceBusReceiver messageReceiver,
5757
ServiceBusReceivedMessage message,
5858
AzureServiceBusMessageContext messageContext,
Lines changed: 68 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,49 @@
11
using System;
2-
using GuardNet;
32

43
namespace Arcus.Messaging.Abstractions.MessageHandling
54
{
5+
/// <summary>
6+
/// Represents all the possible errors of a <see cref="MessageProcessingResult"/>.
7+
/// </summary>
8+
public enum MessageProcessingError
9+
{
10+
/// <summary>
11+
/// Defines an error that shows that the message processing was interrupted by some external cause,
12+
/// unrelated to the message routing.
13+
/// </summary>
14+
ProcessingInterrupted,
15+
16+
/// <summary>
17+
/// Defines an error shows that no <see cref="IMessageHandler{TMessage,TMessageContext}"/> implementation was found
18+
/// that was able to process the received message.
19+
/// </summary>
20+
CannotFindMatchedHandler,
21+
22+
/// <summary>
23+
/// Defines and error that shows that the matched <see cref="IMessageHandler{TMessage,TMessageContext}"/> implementation
24+
/// was unable to process the received message.
25+
/// </summary>
26+
MatchedHandlerFailed,
27+
}
28+
629
/// <summary>
730
/// Represents an outcome of a message that was processed by an <see cref="IMessageHandler{TMessage,TMessageContext}"/> implementation.
831
/// </summary>
932
public class MessageProcessingResult
1033
{
11-
private MessageProcessingResult()
34+
private MessageProcessingResult(string messageId)
1235
{
36+
MessageId = messageId ?? throw new ArgumentNullException(nameof(messageId));
1337
IsSuccessful = true;
1438
}
1539

16-
private MessageProcessingResult(Exception processingException)
40+
private MessageProcessingResult(string messageId, MessageProcessingError error, string errorMessage, Exception processingException)
1741
{
18-
Guard.NotNull(processingException, nameof(processingException));
19-
20-
IsSuccessful = false;
42+
MessageId = messageId ?? throw new ArgumentNullException(nameof(messageId));
43+
Error = error;
44+
ErrorMessage = errorMessage;
2145
ProcessingException = processingException;
46+
IsSuccessful = false;
2247
}
2348

2449
/// <summary>
@@ -27,27 +52,57 @@ private MessageProcessingResult(Exception processingException)
2752
public bool IsSuccessful { get; }
2853

2954
/// <summary>
30-
/// Gets the exception that occurred during the message processing that represents the cause of the processing failure.
55+
/// Gets the unique ID to identify the message for which this is a processing result.
56+
/// </summary>
57+
public string MessageId { get; }
58+
59+
/// <summary>
60+
/// Gets the error type that shows which kind of error the message processing failed.
61+
/// </summary>
62+
/// <remarks>
63+
/// Only available when this processing result represents an unsuccessful message processing result - when <see cref="IsSuccessful"/> is <c>false</c>.
64+
/// </remarks>
65+
public MessageProcessingError Error { get; }
66+
67+
/// <summary>
68+
/// Gets the description that explains the context of the <see cref="Error"/>.
3169
/// </summary>
3270
/// <remarks>
3371
/// Only available when this processing result represents an unsuccessful message processing result - when <see cref="IsSuccessful"/> is <c>false</c>.
3472
/// </remarks>
73+
public string ErrorMessage { get; }
74+
75+
/// <summary>
76+
/// Gets the exception that occurred during the message processing that represents the cause of the processing failure.
77+
/// </summary>
78+
/// <remarks>
79+
/// Only possibly available when this processing result represents an unsuccessful message processing result - when <see cref="IsSuccessful"/> is <c>false</c>.
80+
/// </remarks>
3581
public Exception ProcessingException { get; }
3682

3783
/// <summary>
3884
/// Gets an <see cref="MessageProcessingResult"/> instance that represents a result of a message was successfully processed.
3985
/// </summary>
40-
public static MessageProcessingResult Success => new MessageProcessingResult();
86+
public static MessageProcessingResult Success(string messageId) => new(messageId);
87+
88+
/// <summary>
89+
/// Creates an <see cref="MessageProcessingResult"/> instance that represents a result of a message that was unsuccessfully processed.
90+
/// </summary>
91+
public static MessageProcessingResult Failure(string messageId, MessageProcessingError error, string errorMessage)
92+
{
93+
return new MessageProcessingResult(messageId, error, errorMessage, processingException: null);
94+
}
4195

4296
/// <summary>
4397
/// Creates an <see cref="MessageProcessingResult"/> instance that represents a result of a message that was unsuccessfully processed.
4498
/// </summary>
45-
/// <param name="processingException">The exception that occurred during the message processing that represents the cause of the processing failure.</param>
46-
/// <exception cref="ArgumentException">Thrown when the <paramref name="processingException"/> is blank.</exception>
47-
public static MessageProcessingResult Failure(Exception processingException)
99+
public static MessageProcessingResult Failure(string messageId, MessageProcessingError error, string errorMessage, Exception processingException)
48100
{
49-
Guard.NotNull(processingException, nameof(processingException));
50-
return new MessageProcessingResult(processingException);
101+
return new MessageProcessingResult(
102+
messageId,
103+
error,
104+
errorMessage,
105+
processingException ?? throw new ArgumentNullException(nameof(processingException)));
51106
}
52107
}
53108
}

src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs

Lines changed: 46 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
using System.Text;
33
using System.Threading;
44
using System.Threading.Tasks;
5-
using Arcus.Messaging.Abstractions.MessageHandling;
65
using Arcus.Messaging.Pumps.Abstractions.Resiliency;
76
using GuardNet;
87
using Microsoft.Extensions.Configuration;
@@ -11,27 +10,6 @@
1110

1211
namespace Arcus.Messaging.Pumps.Abstractions
1312
{
14-
/// <summary>
15-
/// Represents the available states in which the <see cref="MessagePump"/> is presently in within the circuit breaker context
16-
/// </summary>
17-
public enum MessagePumpCircuitState
18-
{
19-
/// <summary>
20-
/// The message pump is able to receive messages.
21-
/// </summary>
22-
Closed,
23-
24-
/// <summary>
25-
/// The message pump is under inspection if it can receive messages.
26-
/// </summary>
27-
HalfOpen,
28-
29-
/// <summary>
30-
/// The message pump is unable to receive messages.
31-
/// </summary>
32-
Open
33-
}
34-
3513
/// <summary>
3614
/// Represents the foundation for building message pumps.
3715
/// </summary>
@@ -123,18 +101,6 @@ public override async Task StopAsync(CancellationToken cancellationToken)
123101
await base.StopAsync(cancellationToken);
124102
}
125103

126-
/// <summary>
127-
/// Try to process a single message after the circuit was broken, a.k.a entering the half-open state.
128-
/// </summary>
129-
/// <returns>
130-
/// [Success] when the related message handler can again process messages and the message pump can again start receive messages in full; [Failure] otherwise.
131-
/// </returns>
132-
public virtual Task<MessageProcessingResult> TryProcessProcessSingleMessageAsync(MessagePumpCircuitBreakerOptions options)
133-
{
134-
CircuitState = MessagePumpCircuitState.HalfOpen;
135-
return Task.FromResult(MessageProcessingResult.Success);
136-
}
137-
138104
/// <summary>
139105
/// Start with receiving messages on this message pump.
140106
/// </summary>
@@ -154,11 +120,56 @@ public virtual Task StartProcessingMessagesAsync(CancellationToken cancellationT
154120
public virtual Task StopProcessingMessagesAsync(CancellationToken cancellationToken)
155121
{
156122
IsStarted = false;
157-
CircuitState = MessagePumpCircuitState.Open;
123+
CircuitState = CircuitState.TransitionTo(CircuitBreakerState.Open);
158124

159125
return Task.CompletedTask;
160126
}
161127

128+
/// <summary>
129+
/// Waits a previously configured amount of time until the message pump is expected to be recovered (Closed to Open state).
130+
/// </summary>
131+
/// <param name="cancellationToken">The token to cancel the wait period.</param>
132+
protected async Task WaitMessageRecoveryPeriodAsync(CancellationToken cancellationToken)
133+
{
134+
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to wait message recovery period of '{Recovery}' during '{State}' state", JobId, CircuitState.Options.MessageRecoveryPeriod.ToString("g"), CircuitState);
135+
await Task.Delay(CircuitState.Options.MessageRecoveryPeriod, cancellationToken);
136+
137+
CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);
138+
}
139+
140+
/// <summary>
141+
/// Waits a previously configured amount of time until the next single message can be tried (Half-Open state).
142+
/// </summary>
143+
/// <param name="cancellationToken">The token to cancel the wait period.</param>
144+
protected async Task WaitMessageIntervalDuringRecoveryAsync(CancellationToken cancellationToken)
145+
{
146+
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to wait message interval during recovery of '{Interval}' during the '{State}' state", JobId, CircuitState.Options.MessageIntervalDuringRecovery.ToString("g"), CircuitState);
147+
await Task.Delay(CircuitState.Options.MessageIntervalDuringRecovery, cancellationToken);
148+
149+
CircuitState = CircuitState.TransitionTo(CircuitBreakerState.HalfOpen);
150+
}
151+
152+
/// <summary>
153+
/// Notifies the message pump about the new state which pauses message retrieval.
154+
/// </summary>
155+
/// <param name="options">The additional accompanied options that goes with the new state.</param>
156+
internal void NotifyPauseReceiveMessages(MessagePumpCircuitBreakerOptions options)
157+
{
158+
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition from a '{CurrentState}' an 'Open' state", JobId, CircuitState);
159+
160+
CircuitState = CircuitState.TransitionTo(CircuitBreakerState.Open, options);
161+
}
162+
163+
/// <summary>
164+
/// Notifies the message pump about the new state which resumes message retrieval.
165+
/// </summary>
166+
protected void NotifyResumeRetrievingMessages()
167+
{
168+
Logger.LogDebug("Circuit breaker caused message pump '{JobId}' to transition back from '{CurrentState}' to a 'Closed' state, retrieving messages is resumed", JobId, CircuitState);
169+
170+
CircuitState = MessagePumpCircuitState.Closed;
171+
}
172+
162173
/// <summary>
163174
/// Register information about the client connected to the messaging service
164175
/// </summary>
Lines changed: 9 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
using System;
22
using System.Linq;
3-
using System.Threading;
43
using System.Threading.Tasks;
5-
using Arcus.Messaging.Abstractions.MessageHandling;
64
using GuardNet;
75
using Microsoft.Extensions.DependencyInjection;
86
using Microsoft.Extensions.Hosting;
@@ -40,70 +38,28 @@ public DefaultMessagePumpCircuitBreaker(IServiceProvider serviceProvider, ILogge
4038
/// <param name="jobId">The unique identifier to distinguish the message pump in the application services.</param>
4139
/// <param name="configureOptions">The optional user-configurable options to manipulate the workings of the message pump interaction.</param>
4240
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
43-
public virtual async Task PauseMessageProcessingAsync(string jobId, Action<MessagePumpCircuitBreakerOptions> configureOptions)
41+
public virtual Task PauseMessageProcessingAsync(string jobId, Action<MessagePumpCircuitBreakerOptions> configureOptions)
4442
{
4543
Guard.NotNullOrWhitespace(jobId, nameof(jobId));
4644

47-
var options = new MessagePumpCircuitBreakerOptions();
48-
configureOptions?.Invoke(options);
49-
5045
MessagePump messagePump = GetRegisteredMessagePump(jobId);
5146

5247
if (!messagePump.IsStarted)
5348
{
54-
_logger.LogWarning($"Cannot pause MessagePump for JobId {jobId} because the MessagePump has not been started.");
55-
return;
49+
_logger.LogWarning("Cannot pause message pump '{JobId}' because the pump has not been started", jobId);
50+
return Task.CompletedTask;
5651
}
5752

58-
if (messagePump.CircuitState != MessagePumpCircuitState.Closed)
59-
{
60-
_logger.LogWarning($"Cannot pause MessagePump for JobId {jobId} because the MessagePump's circuitbreaker is not in a closed state.");
61-
return;
62-
}
63-
64-
_logger.LogDebug("Open circuit by pausing message processing for message pump '{JobId}'...", jobId);
65-
await messagePump.StopProcessingMessagesAsync(CancellationToken.None);
66-
67-
await Task.Factory.StartNew(async () =>
68-
{
69-
await WaitRecoveryTimeAsync(messagePump, options);
70-
71-
MessageProcessingResult result;
72-
do
73-
{
74-
result = await TryProcessSingleMessageAsync(messagePump, options);
75-
76-
if (!result.IsSuccessful)
77-
{
78-
await WaitMessageIntervalDuringRecoveryAsync(messagePump, options);
79-
}
80-
81-
} while (!result.IsSuccessful);
82-
83-
await ResumeMessageProcessingAsync(jobId);
84-
}, TaskCreationOptions.LongRunning);
85-
}
86-
87-
/// <summary>
88-
/// Continue the process of receiving messages in the message pump after a successful message handling.
89-
/// </summary>
90-
/// <param name="jobId">The unique identifier to distinguish the message pump in the application services.</param>
91-
/// <exception cref="ArgumentException">Thrown when the <paramref name="jobId"/> is blank.</exception>
92-
public virtual async Task ResumeMessageProcessingAsync(string jobId)
93-
{
94-
Guard.NotNullOrWhitespace(jobId, nameof(jobId));
95-
96-
MessagePump messagePump = GetRegisteredMessagePump(jobId);
97-
98-
if (messagePump.IsStarted)
53+
if (!messagePump.CircuitState.IsClosed)
9954
{
100-
_logger.LogWarning("Resume called on Message pump '{JobId}' but Message pump is already started. CircuitState = {CircuitState}", jobId, messagePump.CircuitState);
101-
return;
55+
return Task.CompletedTask;
10256
}
10357

104-
_logger.LogInformation("Message pump '{JobId}' successfully handled a single message, resume message processing (circuit breaker: closed)", messagePump.JobId);
105-
await messagePump.StartProcessingMessagesAsync(CancellationToken.None);
58+
var options = new MessagePumpCircuitBreakerOptions();
59+
configureOptions?.Invoke(options);
10660

61+
messagePump.NotifyPauseReceiveMessages(options);
62+
return Task.CompletedTask;
10763
}
10864

10965
/// <summary>
@@ -148,23 +104,5 @@ protected MessagePump GetRegisteredMessagePump(string jobId)
148104

149105
return messagePumps[0];
150106
}
151-
152-
private async Task<MessageProcessingResult> TryProcessSingleMessageAsync(MessagePump messagePump, MessagePumpCircuitBreakerOptions options)
153-
{
154-
_logger.LogDebug("Try to process single message in message pump '{JobId}' (state: half-open)", messagePump.JobId);
155-
return await messagePump.TryProcessProcessSingleMessageAsync(options);
156-
}
157-
158-
private async Task WaitMessageIntervalDuringRecoveryAsync(MessagePump messagePump, MessagePumpCircuitBreakerOptions options)
159-
{
160-
_logger.LogDebug("Wait configured interval period ({IntervalPeriod}) since message pump '{JobId}' failed to handle a single message (circuit breaker: open)", options.MessageIntervalDuringRecovery, messagePump.JobId);
161-
await Task.Delay(options.MessageIntervalDuringRecovery);
162-
}
163-
164-
private async Task WaitRecoveryTimeAsync(MessagePump messagePump, MessagePumpCircuitBreakerOptions options)
165-
{
166-
_logger.LogDebug("Wait configured recovery period ({RecoveryPeriod}) since message pump '{JobId}' failed to process messages (circuit breaker: open)", options.MessageRecoveryPeriod, messagePump.JobId);
167-
await Task.Delay(options.MessageRecoveryPeriod);
168-
}
169107
}
170108
}

0 commit comments

Comments
 (0)