-
Notifications
You must be signed in to change notification settings - Fork 3k
Description
Describe the bug
When under high load and also using Redis transactions, Redis cache result starts returning "QUEUED" instead of the function computed value or cached value.
Expected behavior
I'd expect either it to wait for Redis or call the underlying function and get a result. Without any transactions happening, "QUEUED" is never returned from a cache result.
Actual behavior
The CacheResult is "QUEUED".
How to Reproduce?
This is a test i wrote to reproduce this bug on our system:
%test.quarkus.redis.max-pool-size=100
%test.quarkus.redis.max-pool-waiting=400
package com.picky.creators.notification.core.template;
import static org.junit.jupiter.api.Assertions.*;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.logging.Log;
import io.quarkus.cache.CacheKey;
import io.quarkus.cache.CacheResult;
import io.quarkus.redis.datasource.RedisDataSource;
import io.quarkus.redis.datasource.transactions.TransactionResult;
import io.quarkus.redis.datasource.value.ValueCommands;
import jakarta.inject.Inject;
import jakarta.enterprise.context.ApplicationScoped;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.DisplayName;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
/**
* Complete test to reproduce the 'QUEUED' issue with Redis cache.
* This test includes all service classes needed to run the testRapidSuccessiveCalls test.
*/
@QuarkusTest
class TestRapidSuccessiveCalls {
@Inject
SimpleCacheTestService cacheService;
@Inject
RedisTransactionTestService transactionService;
@Test
@DisplayName("Test rapid successive calls")
void testRapidSuccessiveCalls() throws Exception {
final String key = "rapid-test";
final int cacheThreads = 100;
final int transactionThreads = 100;
final int callsPerThread = 10_000;
final ExecutorService executor = Executors.newFixedThreadPool(cacheThreads + transactionThreads);
final List<Future<Void>> futures = new ArrayList<>();
final AtomicInteger totalCalls = new AtomicInteger(0);
for (int t = 0; t < cacheThreads; t++) {
final int threadId = t;
futures.add(executor.submit(() -> {
for (int i = 0; i < callsPerThread; i++) {
final String result = cacheService.getCachedValue(key);
final int callNumber = totalCalls.incrementAndGet();
if (callNumber % 100 == 0) {
Log.infof("Cache Thread %d, call %d: '%s'", threadId, callNumber, result);
}
assertNotNull(result, String.format("Cache Thread %d, call %d should not be null", threadId, i));
assertNotEquals("QUEUED", result, String.format("Cache Thread %d, call %d should not be 'QUEUED'", threadId, i));
assertEquals("Value for: rapid-test", result, String.format("Cache Thread %d, call %d should be correct", threadId, i));
}
return null;
}));
}
for (int t = 0; t < transactionThreads; t++) {
final int threadId = t;
futures.add(executor.submit(() -> {
for (int i = 0; i < callsPerThread; i++) {
transactionService.performTransactionOperations("test-txn", threadId * 1000 + i);
final int callNumber = totalCalls.incrementAndGet();
if (callNumber % 100 == 0) {
Log.infof("Transaction Thread %d, operation %d completed", threadId, i);
}
}
return null;
}));
}
// Wait for all threads to complete
for (final Future<Void> future : futures) {
future.get();
}
executor.shutdown();
Log.infof("Completed %d concurrent operations across %d threads", totalCalls.get(), cacheThreads + transactionThreads);
}
@ApplicationScoped
public static class SimpleCacheTestService {
@CacheResult(cacheName = "testCache")
public String getCachedValue(@CacheKey final String key) {
final String result = "Value for: " + key;
Log.infof("Computing value for key: %s, result: %s", key, result);
try {
Thread.sleep(1000);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
}
return result;
}
}
@ApplicationScoped
public static class RedisTransactionTestService {
private final RedisDataSource redisDataSource;
private final ValueCommands<String, String> valueCommands;
public RedisTransactionTestService(final RedisDataSource redisDataSource) {
this.redisDataSource = redisDataSource;
this.valueCommands = redisDataSource.value(String.class);
}
public void performTransactionOperations(final String keyPrefix, final int operationId) {
final Map<String, String> entries = new HashMap<>();
entries.put(keyPrefix + ":key1:" + operationId, "value1:" + operationId);
entries.put(keyPrefix + ":key2:" + operationId, "value2:" + operationId);
entries.put(keyPrefix + ":key3:" + operationId, "value3:" + operationId);
try {
Log.infof("Starting transaction for operation %d", operationId);
final TransactionResult result = redisDataSource.withTransaction(tx -> {
for (final Map.Entry<String, String> entry : entries.entrySet()) {
valueCommands.setex(entry.getKey(), 60, entry.getValue());
}
});
if (result.discarded()) {
Log.warnf("Transaction discarded for operation %d", operationId);
} else {
Log.infof("Transaction completed for operation %d", operationId);
}
} catch (final Exception e) {
Log.errorf(e, "Transaction failed for operation %d", operationId);
}
}
}
}
Output of uname -a
or ver
Linux danprueitt-arch 6.15.2-arch1-1 #1 SMP PREEMPT_DYNAMIC Tue, 10 Jun 2025 21:32:33 +0000 x86_64 GNU/Linux
Output of java -version
openjdk 21.0.6 2025-01-21 LTS OpenJDK Runtime Environment Temurin-21.0.6+7 (build 21.0.6+7-LTS) OpenJDK 64-Bit Server VM Temurin-21.0.6+7 (build 21.0.6+7-LTS, mixed mode, sharing)
Quarkus version or git rev
3.23.3
Build tool (ie. output of mvnw --version
or gradlew --version
)
Apache Maven 3.9.10 (5f519b97e944483d878815739f519b2eade0a91d)
Additional information
No response