-
Notifications
You must be signed in to change notification settings - Fork 189
Description
Hi !
Quarkus version : 3.5.0
smallrye-reactive-messaging : 4.10.1
I created the following consumer :
@Incoming("topicIn") @Outgoing("topicOut") public Multi<Message<String>> consumeIncoming(Message<String> message) { return service.doSomething(message).onFailure().invoke(e -> log.error( "error, sould be sent to DLQ", e)); }
The service creates 2 messages to be sent to @Outgoing Channel, and also sends another message using a producer (with MutinyEmitter).
public Multi<Message<String>> doSomething(Message<String> incomingMessage) { return doSomethingThatReturnUni(message) .onItem().transformToMulti(element -> { //buildMessages creates 2 Kafka Messages from incoming message return Multi.createFrom().items(incomingMessage).map(this::buildMessages) .onCompletion().invoke(() -> { kafkaProducer.sendCleanMessage(incomingMessage) .subscribe().with( success -> { log.debug("clean message sent : {}", incomingMessage.getPayload()); //manual ack to prevent message from going into DLQ incomingMessage.ack(); }, failure -> { log.error("Operation failed : {}, error:{}", incomingMessage.getPayload(), failure.getMessage()); } ); } ); }).onFailure().transform(ex -> new RuntimeException("Processing failed for " + incomingMessage.getPayload(), ex));
Expected behavior :
- On success, both messages are sent to @Outgoing topic, one message is sent to the other channel and incoming message is acked (offset is commited)
- If there is an error while processing (exception thrown in buildMessages), exception is propagated to pipeline, incoming message is sent to DLQ and offset is commited <= offset is not commited
I tried not to do manual ack using following signature
public Multi<String> consumeIncoming(String message)
But it seems @Outgoing does not wait the Multi result to consider the message consumed : The Reactive Messaging connector commits the message when the method returns, not when the returned Multi completes.
It seems that using synchronous code would work : if an exception is thrown, message goes into DLQ and message should be acked. I can't figure what to do to make this work asynchronously...maybe change @acknowledgment(Acknowledgment.Strategy ?