Skip to content
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
2481456
Upgrade packages
fgheysels Jun 26, 2024
54f4eda
update package
fgheysels Jun 26, 2024
42a9e0e
package updates
fgheysels Jun 26, 2024
0508afb
remove duplicate package ref
fgheysels Jun 26, 2024
9b8d8a2
detach eventhandlers before stopping ServiceBusProcessor
fgheysels Jun 26, 2024
0265f06
Merge branch 'main' into frgh/fix/433_unable_to_stop_processor
stijnmoreels Jun 27, 2024
25d673b
pr-fix: fully use message receiver
stijnmoreels Jun 27, 2024
56a4782
pr-fix: use newest features in messaging package
stijnmoreels Jun 27, 2024
93fef19
pr-fix: update az packages + message pump accessibility check
stijnmoreels Jun 27, 2024
dc56063
pr-fix: set is-started in receive messages
stijnmoreels Jun 27, 2024
aad066f
pr-fix: correct accessibility on message pump
stijnmoreels Jun 27, 2024
ba7c04a
pr-fix: introduce resume functionality
stijnmoreels Jun 28, 2024
ffe4a35
pr-fix: mark the interval processing as long-running
stijnmoreels Jun 28, 2024
dd64660
pr-fix: intro circuit breaker message handler
stijnmoreels Jul 2, 2024
00ea243
pr-fix: correct worker logging
stijnmoreels Jul 2, 2024
50ca17f
pr-fix: stabelize with file system as event source
stijnmoreels Jul 2, 2024
1688fa1
temp commit
stijnmoreels Jul 9, 2024
a3ab3b0
Merge branch 'main' into frgh/fix/433_unable_to_stop_processor
stijnmoreels Aug 27, 2024
54b805b
pr-sug: use private set for circuit breaker state
stijnmoreels Aug 27, 2024
b0cf15c
pr-fix: correctly auto-complete message
stijnmoreels Sep 19, 2024
0acdbc3
pr-fix: add additional test verifications
stijnmoreels Sep 20, 2024
ff876b9
Update src/Arcus.Messaging.Pumps.Abstractions/MessagePump.cs
stijnmoreels Oct 7, 2024
1579dce
pr-fix: auto-dead-letter when no message handler can process message
stijnmoreels Oct 7, 2024
fc2482c
pr-fix: add exception handling for fallback registrations
stijnmoreels Oct 7, 2024
3283726
Merge branch 'frgh/fix/433_unable_to_stop_processor' of https://githu…
stijnmoreels Oct 7, 2024
6156541
pr-fix: correct receiver tests
stijnmoreels Oct 7, 2024
8a79b2f
Merge branch 'main' into frgh/fix/433_unable_to_stop_processor
stijnmoreels Oct 7, 2024
d3c81b7
pr-fix: receive on sensors
stijnmoreels Oct 7, 2024
1f8be4e
pr-fix: auto-abandon & -dead-letter in router
stijnmoreels Oct 14, 2024
7e55207
pr-fix: null-check and updated unit tests
stijnmoreels Oct 14, 2024
1aeb28b
pr-fix: transient complete + safeguard missing message
stijnmoreels Nov 13, 2024
118b564
pr-fix: correct namespace connection string
stijnmoreels Nov 13, 2024
490f39e
pr-fix: use managed identity
stijnmoreels Nov 13, 2024
903ae92
pr-fix: skip for now unavailable system tests
stijnmoreels Nov 13, 2024
bfff8ae
pr-fix: use dedicated namespace connection string
stijnmoreels Nov 14, 2024
ca11de4
Merge branch 'main' into frgh/fix/433_unable_to_stop_processor
stijnmoreels Nov 14, 2024
fc0040d
pr-fix: correct appsettings.json
stijnmoreels Nov 14, 2024
7377c0f
pr-fix: remove delay 1 day
stijnmoreels Nov 14, 2024
5213bf5
pr-fix: activate circuit-breaker tests
stijnmoreels Nov 15, 2024
09e8924
pr-sug: rename get circuit-breaker state expose method
stijnmoreels Nov 20, 2024
23520db
Improve circuit-breaker handling
fgheysels Nov 25, 2024
6578968
Merge branch 'frgh/fix/433_unable_to_stop_processor' of https://githu…
fgheysels Nov 25, 2024
325c68f
safeguards in pause
fgheysels Nov 25, 2024
5107ea7
remove unused private
fgheysels Nov 25, 2024
ee8fc92
fix compiler warnings
fgheysels Nov 25, 2024
15de5b2
fix test
fgheysels Nov 25, 2024
1c8eb1a
code layout
fgheysels Nov 25, 2024
8cbd9eb
feat: change circuitbreaker approach (#453)
fgheysels Dec 10, 2024
d5cfabf
pr-sug: correct is started description
stijnmoreels Dec 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 31 additions & 20 deletions docs/preview/02-Features/06-general-messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,30 @@ To interact with the message processing system within your custom message handle
using Arcus.Messaging.Abstractions.ServiceBus.MessageHandling;
using Arcus.Messaging.Pumps.Abstractions.Resiliency;

public class OrderMessageHandler : IAzureServiceBusMessageHandler<Order>
public class OrderMessageHandler : CircuitBreakerServiceBusMessageHandler<Order>
{
private readonly IMessagePumpCircuitBreaker _circuitBreaker;

public OrderMessageHandler(IMessagePumpCircuitBreaker circuitBreaker)
public OrderMessageHandler(IMessagePumpCircuitBreaker circuitBreaker, ILogger<...> logger) : base(circuitBreaker, logger)
{
_circuitBreaker = circuitBreaker;
}

public async Task ProcessMessageAsync(Order message, AzureServiceBusMessageContext messageContext, ...)
public override async Task ProcessMessageAsync(
Order message,
AzureServiceBusMessageContext messageContext,
MessagePumpCircuitBreakerOptions options,
...)
{
// Determine whether your dependent system is healthy...

// If not, call the circuit breaker, processing will be halted temporarily.
await _circuitBreaker.PauseMessageProcessingAsync(messageContext.JobId);
if (!IsDependentSystemHealthy())
{
throw new ...Exception("My dependency system is temporarily unavailable, please halt message processing for now");
}
else
{
// Process your message...
}
}
}
```
Expand All @@ -47,24 +56,26 @@ The message pump will by default act in the following pattern:
* Circuit breaker calls `Pause`
* Message pump stops processing messages for a period of time (circuit is OPEN).
* Message pump tries processing a single message (circuit is HALF-OPEN).
* Dependency still unhealthy? => Tries again later (circuit is OPEN)
* Dependency healthy? => Message pump starts receiving message in full again (circuit is CLOSED).
* Dependency still unhealthy? => circuit breaker pauses again (circuit is OPEN)
* Dependency healthy? => circuit breaker resumes, message pump starts receiving message in full again (circuit is CLOSED).

Both the recovery period after the circuit is open and the interval between messages when the circuit is half-open is configurable when calling the circuit breaker. These time periods are related to your dependent system and could change by the type of transient connection failure.

```csharp
await _circuitBreaker.PauseMessageProcessingAsync(
messageContext.JobId,
options =>
{
// Sets the time period the circuit breaker should wait before retrying to receive messages.
// A.k.a. the time period the circuit is closed (default: 30 seconds).
options.MessageRecoveryPeriod = TimeSpan.FromSeconds(15);

// Sets the time period the circuit breaker should wait between each message after the circuit was closed, during recovery.
// A.k.a. the time interval to receive messages during which the circuit is half-open (default: 10 seconds).
options.MessageIntervalDuringRecovery = TimeSpan.FromSeconds(1.5);
});
public override async Task ProcessMessageAsync(
Order message,
AzureServiceBusMessageContext messageContext,
MessagePumpCircuitBreakerOptions options,
...)
{
// Sets the time period the circuit breaker should wait before retrying to receive messages.
// A.k.a. the time period the circuit is closed (default: 30 seconds).
options.MessageRecoveryPeriod = TimeSpan.FromSeconds(15);

// Sets the time period the circuit breaker should wait between each message after the circuit was closed, during recovery.
// A.k.a. the time interval to receive messages during which the circuit is half-open (default: 10 seconds).
options.MessageIntervalDuringRecovery = TimeSpan.FromSeconds(1.5);
}
```

### Pause message processing for a fixed period of time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ public class AzureServiceBusSystemProperties
private AzureServiceBusSystemProperties(ServiceBusReceivedMessage message)
{
Guard.NotNull(message, nameof(message), "Requires an Azure Service Bus received message to construct a set of Azure Service Bus system properties");

DeadLetterSource = message.DeadLetterSource;
DeliveryCount = message.DeliveryCount;
EnqueuedSequenceNumber = message.EnqueuedSequenceNumber;
EnqueuedTime = message.EnqueuedTime;
LockToken = message.LockToken;
IsLockTokenSet = message.LockToken != null;
IsLockTokenSet = message.LockToken != null && message.LockToken != Guid.Empty.ToString();
LockedUntil = message.LockedUntil;
IsReceived = message.SequenceNumber > -1;
SequenceNumber = message.SequenceNumber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public static class ProcessMessageEventArgsExtensions
/// <exception cref="TypeNotFoundException">Thrown when the no Azure Service Bus receiver could be found on the <paramref name="args"/>.</exception>
/// <exception cref="InvalidOperationException">Thrown when no value could be found for the Azure Service Bus receiver on the <paramref name="args"/>.</exception>
/// <exception cref="InvalidCastException">Thrown when the value for the Azure Service Bus receiver on the <paramref name="args"/> wasn't the expected type.</exception>
[Obsolete("Service Bus receiver is used internally instead, no need to go via the processor")]
public static ServiceBusReceiver GetServiceBusReceiver(this ProcessMessageEventArgs args)
{
Guard.NotNull(args, nameof(args), "Requires an event args instance to retrieve the original Service Bus message receiver");
Expand Down
Loading
Loading