Skip to content

Error when using withTransaction in RedisDataSource in clustered mode #32361

@stefanorg

Description

@stefanorg

Describe the bug

Hi all,
i want to execute di code (basically a ratelimit algorithm) and everything work fine


// redisDataSource is of type import io.quarkus.redis.datasource.RedisDataSource;


     KeyCommands<String> keys = redisDataSource.key();
        SortedSetCommands<String, Long> z = redisDataSource.sortedSet(String.class, Long.class);
        // remove all element expired (not in current window)
        z.zremrangebyscore(_key, ScoreRange.from(0, interval));
        // add this request
        z.zadd(_key, now, now);
        // count all request
        List<ScoredValue<Long>> scoredValues = z.zrangeWithScores(_key, 0, -1);
        // expire
        keys.expire(_key, INTERVAL_SECONDS);

based on documentation https://quarkus.io/guides/redis-reference i want to execute those commands in a transaction (using MULTI) so i changed the code to use withTransaction

TransactionResult transactionResult = redisDataSource.withTransaction(tx -> {
            TransactionalKeyCommands<String> keys = tx.key();
            TransactionalSortedSetCommands<String, Long> z = tx.sortedSet(String.class, Long.class);
            // remove all element expired (not in current window)
            z.zremrangebyscore(_key, ScoreRange.from(0, interval));
            // add this request
            z.zadd(_key, now, now);
            // count all request
            z.zrangeWithScores(_key, 0, -1);
            // expire
            keys.expire(_key, INTERVAL_SECONDS);
        });

but this way i get different kind of exceptions trying to execute the code:

sometime this:

Request failed: java.lang.IllegalStateException: Unable to add command to the current transaction
        at io.quarkus.redis.runtime.datasource.AbstractTransactionalCommands.queuedOrDiscard(AbstractTransactionalCommands.java:20)
        at io.smallrye.context.impl.wrappers.SlowContextualConsumer.accept(SlowContextualConsumer.java:21)
        at io.smallrye.mutiny.operators.uni.UniOnItemConsume$UniOnItemComsumeProcessor.invokeEventHandler(UniOnItemConsume.java:77)
        at io.smallrye.mutiny.operators.uni.UniOnItemConsume$UniOnItemComsumeProcessor.onItem(UniOnItemConsume.java:42)
        at io.smallrye.mutiny.vertx.AsyncResultUni.lambda$subscribe$1(AsyncResultUni.java:35)
        at io.smallrye.mutiny.vertx.DelegatingHandler.handle(DelegatingHandler.java:25)
        at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
        at io.vertx.core.impl.future.FutureImpl$ListenerArray.onSuccess(FutureImpl.java:262)
        at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
        at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
        at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
        at io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)
        at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:41)
        at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)
        at io.vertx.redis.client.impl.RedisClusterConnection.lambda$send$7(RedisClusterConnection.java:330)
        at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
        at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
        at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
        at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
        at io.vertx.redis.client.impl.RedisStandaloneConnection.handle(RedisStandaloneConnection.java:409)
        at io.vertx.redis.client.impl.RESPParser.handleResponse(RESPParser.java:316)
        at io.vertx.redis.client.impl.RESPParser.handleNumber(RESPParser.java:147)
        at io.vertx.redis.client.impl.RESPParser.handle(RESPParser.java:91)
        at io.vertx.redis.client.impl.RESPParser.handle(RESPParser.java:24)
        at io.vertx.core.net.impl.NetSocketImpl.lambda$new$1(NetSocketImpl.java:100)
        at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:239)
        at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:129)
        at io.vertx.core.net.impl.NetSocketImpl$DataMessageHandler.handle(NetSocketImpl.java:414)
        at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:55)
        at io.vertx.core.impl.DuplicatedContext.emit(DuplicatedContext.java:158)
        at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:390)
        at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:157)
        at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)

sometime i get ERR MULTI calls can not be nested

2023-03-31 17:37:25,768 ERROR [org.jbo.res.rea.com.cor.AbstractResteasyReactiveContext] (executor-thread-0) Request failed: java.util.concurrent.CompletionException: ERR MULTI calls can not be nested
        at io.smallrye.mutiny.operators.uni.UniBlockingAwait.await(UniBlockingAwait.java:79)
        at io.smallrye.mutiny.groups.UniAwait.atMost(UniAwait.java:65)
        at io.quarkus.redis.runtime.datasource.BlockingRedisDataSourceImpl.withTransaction(BlockingRedisDataSourceImpl.java:72)
        at io.quarkus.redis.datasource.RedisDataSource_16a9813a62a770a26823359040f690d9904a090a_Synthetic_ClientProxy.withTransaction(Unknown Source)
        at it.argosoft.poc.ratelimit.service.TokenBucketRateLimitingService.shouldAllow(TokenBucketRateLimitingService.java:58)
        at it.argosoft.poc.ratelimit.service.TokenBucketRateLimitingService_ClientProxy.shouldAllow(Unknown Source)
        at it.argosoft.poc.ratelimit.filters.ApiRateLimitFilter.filter(ApiRateLimitFilter.java:43)
        at org.jboss.resteasy.reactive.server.handlers.ResourceRequestFilterHandler.handle(ResourceRequestFilterHandler.java:48)
        at io.quarkus.resteasy.reactive.server.runtime.QuarkusResteasyReactiveRequestContext.invokeHandler(QuarkusResteasyReactiveRequestContext.java:104)
        at org.jboss.resteasy.reactive.common.core.AbstractResteasyReactiveContext.run(AbstractResteasyReactiveContext.java:145)
        at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:576)
        at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2449)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1478)
        at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
        at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: ERR MULTI calls can not be nested

The same happen if i use withTransaction in the reactive variant of ReactiveRedisDataSource

What i'm doing wrong?
Thanks

Expected behavior

No response

Actual behavior

No response

How to Reproduce?

No response

Output of uname -a or ver

No response

Output of java -version

No response

GraalVM version (if different from Java)

No response

Quarkus version or git rev

2.16.5

Build tool (ie. output of mvnw --version or gradlew --version)

./mvnw --version Warning: JAVA_HOME environment variable is not set.  100% Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63) Maven home: /home/scorallo/.m2/wrapper/dists/apache-maven-3.8.6-bin/67568434/apache-maven-3.8.6 Java version: 17.0.6, vendor: Amazon.com Inc., runtime: /home/scorallo/.jdks/corretto-17.0.6 Default locale: it_IT, platform encoding: UTF-8 OS name: "linux", version: "5.19.0-38-generic", arch: "amd64", family: "unix"

Additional information

No response

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions