-
Notifications
You must be signed in to change notification settings - Fork 189
Open
Description
In my code I use kafka for processing and then saving in the redis.
@Incoming("words-in")
public Uni<Void> sink(String word) {
return Uni.createFrom().item(word)
.onItem().delayIt().by(Duration.ofSeconds(1))
.onCancellation()
.invoke(() -> System.out.println("⬆️ Cancelled"))
.onTermination()
.invoke(() -> System.out.println("🔚 Uni terminated"))
.flatMap(w -> kv.set("lastv", w)) //<------- THIS IS REDIS COMMAND
.onItem()
.invoke(() -> {
System.out.println("Received word: " + word);});
}
Whenever there is shutdown, I get this kind of exception:
2025-08-21 11:04:47,790 WARN [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-10) SRMSG18228: A failure has been reported for Kafka topics '[words]': java.lang.IllegalStateException: Client is closed
at io.vertx.core.net.impl.NetClientImpl.connect(NetClientImpl.java:215)
at io.vertx.core.net.impl.NetClientImpl.connect(NetClientImpl.java:127)
at io.vertx.redis.client.impl.RedisConnectionManager$RedisConnectionProvider.connect(RedisConnectionManager.java:167)
at io.vertx.core.net.impl.pool.SimpleConnectionPool.connect(SimpleConnectionPool.java:253)
at io.vertx.core.net.impl.pool.SimpleConnectionPool$Acquire$3.run(SimpleConnectionPool.java:595)
at io.vertx.core.net.impl.pool.Task.runNextTasks(Task.java:43)
at io.vertx.core.net.impl.pool.CombinerExecutor.submit(CombinerExecutor.java:91)
It happens because redis has closed the connection, while there are still messages in-flight from kafka.
Is there some way to prevent redis from closing the connections before the last kafka message that has been ACK-ed is processed?
Metadata
Metadata
Assignees
Labels
No labels