Skip to content

Commit 1deb7bd

Browse files
authored
WATCH is now working in the same time as MULTI when called inside a MULTI (#3027)
1 parent 68e0809 commit 1deb7bd

File tree

4 files changed

+54
-30
lines changed

4 files changed

+54
-30
lines changed

src/main/java/io/lettuce/core/FutureSyncInvocationHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ private static boolean isTransactionActive(StatefulConnection<?, ?> connection)
100100

101101
private static boolean isTxControlMethod(String methodName, Object[] args) {
102102

103-
if (methodName.equals("exec") || methodName.equals("multi") || methodName.equals("discard")) {
103+
if (methodName.equals("exec") || methodName.equals("multi") || methodName.equals("discard")
104+
|| methodName.equals("watch")) {
104105
return true;
105106
}
106107

src/main/java/io/lettuce/core/StatefulRedisConnectionImpl.java

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,6 @@ public boolean isMulti() {
166166
public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) {
167167

168168
RedisCommand<K, V, T> toSend = preProcessCommand(command);
169-
170-
potentiallyEnableMulti(command);
171-
172169
return super.dispatch(toSend);
173170
}
174171

@@ -179,35 +176,21 @@ public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) {
179176

180177
commands.forEach(o -> {
181178
RedisCommand<K, V, ?> command = preProcessCommand(o);
182-
183179
sentCommands.add(command);
184-
potentiallyEnableMulti(command);
185180
});
186181

187182
return super.dispatch(sentCommands);
188183
}
189184

190-
private void potentiallyEnableMulti(RedisCommand<K, V, ?> command) {
191-
192-
if (command.getType().toString().equals(MULTI.name())) {
193-
194-
multi = (multi == null ? new MultiOutput<>(codec) : multi);
195-
196-
if (command instanceof CompleteableCommand) {
197-
((CompleteableCommand<?>) command).onComplete((ignored, e) -> {
198-
if (e != null) {
199-
multi = null;
200-
}
201-
});
202-
}
203-
}
204-
}
205-
185+
// TODO [tihomir.mateev] Refactor to include as part of the Command interface
186+
// All these if statements clearly indicate this is a problem best solve by each command
187+
// (defining a pre and post processing behaviour of the command)
206188
protected <T> RedisCommand<K, V, T> preProcessCommand(RedisCommand<K, V, T> command) {
207189

208190
RedisCommand<K, V, T> local = command;
191+
String commandType = command.getType().toString();
209192

210-
if (local.getType().toString().equals(AUTH.name())) {
193+
if (commandType.equals(AUTH.name())) {
211194
local = attachOnComplete(local, status -> {
212195
if ("OK".equals(status)) {
213196

@@ -224,7 +207,7 @@ protected <T> RedisCommand<K, V, T> preProcessCommand(RedisCommand<K, V, T> comm
224207
});
225208
}
226209

227-
if (local.getType().toString().equals(SELECT.name())) {
210+
if (commandType.equals(SELECT.name())) {
228211
local = attachOnComplete(local, status -> {
229212
if ("OK".equals(status)) {
230213
Long db = CommandArgsAccessor.getFirstInteger(command.getArgs());
@@ -235,30 +218,30 @@ protected <T> RedisCommand<K, V, T> preProcessCommand(RedisCommand<K, V, T> comm
235218
});
236219
}
237220

238-
if (local.getType().toString().equals(READONLY.name())) {
221+
if (commandType.equals(READONLY.name())) {
239222
local = attachOnComplete(local, status -> {
240223
if ("OK".equals(status)) {
241224
state.setReadOnly(true);
242225
}
243226
});
244227
}
245228

246-
if (local.getType().toString().equals(READWRITE.name())) {
229+
if (commandType.equals(READWRITE.name())) {
247230
local = attachOnComplete(local, status -> {
248231
if ("OK".equals(status)) {
249232
state.setReadOnly(false);
250233
}
251234
});
252235
}
253236

254-
if (local.getType().toString().equals(DISCARD.name())) {
237+
if (commandType.equals(DISCARD.name())) {
255238
if (multi != null) {
256239
multi.cancel();
257240
multi = null;
258241
}
259242
}
260243

261-
if (local.getType().toString().equals(EXEC.name())) {
244+
if (commandType.equals(EXEC.name())) {
262245
MultiOutput<K, V> multiOutput = this.multi;
263246
this.multi = null;
264247
if (multiOutput == null) {
@@ -267,10 +250,25 @@ protected <T> RedisCommand<K, V, T> preProcessCommand(RedisCommand<K, V, T> comm
267250
local.setOutput((MultiOutput) multiOutput);
268251
}
269252

270-
if (multi != null && !local.getType().toString().equals(MULTI.name())) {
253+
if (multi != null && !commandType.equals(MULTI.name()) && !commandType.equals(WATCH.name())) {
254+
// ignore MULTI and WATCH commands nested in another MULTI
271255
local = new TransactionalCommand<>(local);
272256
multi.add(local);
273257
}
258+
259+
if (commandType.equals(MULTI.name())) {
260+
261+
multi = (multi == null ? new MultiOutput<>(codec) : multi);
262+
263+
if (command instanceof CompleteableCommand) {
264+
((CompleteableCommand<?>) command).onComplete((ignored, e) -> {
265+
if (e != null) {
266+
multi = null;
267+
}
268+
});
269+
}
270+
}
271+
274272
return local;
275273
}
276274

src/test/java/io/lettuce/core/commands/TransactionCommandIntegrationTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,30 @@ void errorInMulti() {
142142
assertThat((String) values.get(2)).isEqualTo(value);
143143
}
144144

145+
@Test
146+
void errorWhileWatchInsideMulti() {
147+
assertThat(redis.multi()).isEqualTo("OK");
148+
assertThat(redis.set(key, value)).isEqualTo(null);
149+
assertThatThrownBy(() -> redis.watch(key)).isInstanceOf(RedisCommandExecutionException.class)
150+
.hasMessageContaining("ERR WATCH inside MULTI is not allowed");
151+
assertThat(redis.get(key)).isEqualTo(null);
152+
TransactionResult values = redis.exec();
153+
assertThat(values.wasDiscarded()).isFalse();
154+
assertThat((String) values.get(0)).isEqualTo("OK");
155+
assertThat((String) values.get(1)).isEqualTo(value);
156+
}
157+
158+
@Test
159+
void errorWhileMultiInsideMulti() {
160+
assertThat(redis.multi()).isEqualTo("OK");
161+
assertThat(redis.set(key, value)).isEqualTo(null);
162+
assertThatThrownBy(redis::multi).isInstanceOf(RedisCommandExecutionException.class)
163+
.hasMessageContaining("ERR MULTI calls can not be nested");
164+
assertThat(redis.get(key)).isEqualTo(null);
165+
TransactionResult values = redis.exec();
166+
assertThat(values.wasDiscarded()).isFalse();
167+
}
168+
145169
@Test
146170
void execWithoutMulti() {
147171
assertThatThrownBy(redis::exec).isInstanceOf(RedisCommandExecutionException.class)

src/test/java/io/lettuce/test/ReactiveSyncInvocationHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@ protected Object handleInvocation(Object proxy, Method method, Object[] args) th
7575
if (result instanceof Mono<?>) {
7676
Mono<?> mono = (Mono<?>) result;
7777

78-
if (!method.getName().equals("exec") && !method.getName().equals("multi")) {
78+
if (!method.getName().equals("exec") && !method.getName().equals("multi")
79+
&& !method.getName().equals("watch")) {
7980
if (connection instanceof StatefulRedisConnection && ((StatefulRedisConnection) connection).isMulti()) {
8081
mono.subscribe();
8182
return null;

0 commit comments

Comments
 (0)