-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Description
Is your feature request related to a problem? Please describe.
Currently the ServiceBus Binder implementation doesn't honor the definition of the back-off retry settings defined by the Spring Cloud Stream configuration
consumer:
back-off-initial-interval: 1000
back-off-max-interval: 1000
back-off-multiplier: 2.0
max-attempts: 5Also, I couldn't find in the documentation any alternative do achieve the same behavior using the current set of configs.
Some of the Spring Cloud Stream configs seem to work perfectly fine, so I'm just confused why others wouldn't.
e.g. the following works fine without
stream:
bindings:
consumer-in-0:
destination: process_updates
group: process
binder: dms
binders:
dms:
type: servicebus
cih:
type: servicebusI checked the loaded configs, everything is loaded fine, just doesn't do anything.
Describe the solution you'd like
For consistency, honoring the configurations from the Spring Cloud Stream would be great, but also I couldn't find any other configuration that would achieve the same behavior.
Describe alternatives you've considered
The only alternative I managed to get working was to configure a RetryTemplate and use it to have a retry context on each message consumption. This seems to be the way that I got working in a more reasonable way:
@Bean("serviceBusRetryTemplate")
RetryTemplate retryTemplate() {
var properties = bindingServiceProperties.getConsumerProperties("dmsConsumer-in-0");
RetryTemplate r = new RetryTemplate();
ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();
exponentialBackOffPolicy.setInitialInterval(properties.getBackOffInitialInterval());
exponentialBackOffPolicy.setMultiplier(properties.getBackOffMultiplier());
exponentialBackOffPolicy.setMaxInterval(properties.getBackOffMaxInterval());
r.setBackOffPolicy(exponentialBackOffPolicy);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(properties.getMaxAttempts());
r.setRetryPolicy(simpleRetryPolicy);
return r;
}
@Bean
public Consumer<Message<String>> dmsConsumer() {
return message -> {
try {
retryTemplate.execute(context -> {
final ServiceBusMessageDto messageDto = objectMapper.readValue(message.getPayload(), ServiceBusMessageDto.class);
processMessage(messageDto);
return null;
});
} catch (Exception e) {
log.error("Error processing message: {}", e.getMessage(), e);
throw new RequeueCurrentMessageException("Failed to process message", e);
}
};
}However this seems to be blocking. If there's several messages to be processed, the consumer will process each message sequentially... I could put this into a thread but something feels wrong about going through this loops to just have proper retrial mechanism work with a message queue.
I also tried using the @retryable annotation, however this didn't work at all, maybe I'm missing something
@Retryable(
maxAttemptsExpression = "6",
backoff = @Backoff(
delayExpression = "1000",
multiplierExpression = "5.0",
maxDelayExpression = "600000"
)
)Additional context
In alternative to making this, maybe adding to the docs a sample on how it's envision to deal with retries, and particularly backoff retries which I think it's a fundamental use case for a service bus integration pattern with eventual consistency.
Information Checklist
Kindly make sure that you have added all the following information above and checkoff the required fields otherwise we will treat the issuer as an incomplete report
- Description Added
- Expected solution specified
Metadata
Metadata
Labels
Type
Projects
Status