Skip to content

Commit 90c2de1

Browse files
sabbey37sazzad16
andauthored
JedisPubSub and BinaryJedisPubSub PING don't support optional argument (#2254)
* JedisPubSub and BinaryJedisPubSub PING don't support optional argument * PR review changes * Removes unused import Co-authored-by: M Sazzadul Hoque <[email protected]>
1 parent b54ce7d commit 90c2de1

File tree

3 files changed

+132
-0
lines changed

3 files changed

+132
-0
lines changed

src/main/java/redis/clients/jedis/BinaryJedisPubSub.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static redis.clients.jedis.Protocol.Keyword.MESSAGE;
44
import static redis.clients.jedis.Protocol.Keyword.PMESSAGE;
5+
import static redis.clients.jedis.Protocol.Keyword.PONG;
56
import static redis.clients.jedis.Protocol.Keyword.PSUBSCRIBE;
67
import static redis.clients.jedis.Protocol.Keyword.PUNSUBSCRIBE;
78
import static redis.clients.jedis.Protocol.Keyword.SUBSCRIBE;
@@ -34,6 +35,9 @@ public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
3435
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
3536
}
3637

38+
public void onPong(byte[] pattern) {
39+
}
40+
3741
public void unsubscribe() {
3842
client.unsubscribe();
3943
client.flush();
@@ -64,6 +68,16 @@ public void punsubscribe(byte[]... patterns) {
6468
client.flush();
6569
}
6670

71+
public void ping() {
72+
client.ping();
73+
client.flush();
74+
}
75+
76+
public void ping(byte[] argument) {
77+
client.ping(argument);
78+
client.flush();
79+
}
80+
6781
public boolean isSubscribed() {
6882
return subscribedChannels > 0;
6983
}
@@ -115,6 +129,9 @@ private void process(Client client) {
115129
subscribedChannels = ((Long) reply.get(2)).intValue();
116130
final byte[] bpattern = (byte[]) reply.get(1);
117131
onPUnsubscribe(bpattern, subscribedChannels);
132+
} else if (Arrays.equals(PONG.raw, resp)) {
133+
final byte[] bpattern = (byte[]) reply.get(1);
134+
onPong(bpattern);
118135
} else {
119136
throw new JedisException("Unknown message type: " + firstObj);
120137
}

src/main/java/redis/clients/jedis/JedisPubSub.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,14 @@ public void ping() {
9999
client.flush();
100100
}
101101

102+
public void ping(String argument) {
103+
if (client == null) {
104+
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
105+
}
106+
client.ping(argument);
107+
client.flush();
108+
}
109+
102110
public boolean isSubscribed() {
103111
return subscribedChannels > 0;
104112
}

src/test/java/redis/clients/jedis/tests/commands/PublishSubscribeCommandsTest.java

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package redis.clients.jedis.tests.commands;
22

3+
import static org.junit.Assert.assertArrayEquals;
34
import static org.junit.Assert.assertEquals;
45
import static org.junit.Assert.assertTrue;
56
import static org.junit.Assert.fail;
67

78
import java.io.IOException;
89
import java.net.UnknownHostException;
10+
import java.util.ArrayList;
911
import java.util.Arrays;
12+
import java.util.Collections;
1013
import java.util.HashMap;
1114
import java.util.List;
1215
import java.util.Map;
@@ -112,6 +115,41 @@ public void onUnsubscribe(String channel, int subscribedChannels) {
112115
assertEquals(0L, latchUnsubscribed.getCount());
113116
}
114117

118+
@Test
119+
public void pubSubChannelWithPingPongWithArgument() throws InterruptedException {
120+
final CountDownLatch latchUnsubscribed = new CountDownLatch(1);
121+
final CountDownLatch latchReceivedPong = new CountDownLatch(1);
122+
final List<String> pongPatterns = new ArrayList<>();
123+
jedis.subscribe(new JedisPubSub() {
124+
125+
@Override
126+
public void onSubscribe(String channel, int subscribedChannels) {
127+
publishOne("testchan1", "hello");
128+
}
129+
130+
@Override
131+
public void onMessage(String channel, String message) {
132+
this.ping("hi!");
133+
}
134+
135+
@Override
136+
public void onPong(String pattern) {
137+
pongPatterns.add(pattern);
138+
latchReceivedPong.countDown();
139+
unsubscribe();
140+
}
141+
142+
@Override
143+
public void onUnsubscribe(String channel, int subscribedChannels) {
144+
latchUnsubscribed.countDown();
145+
}
146+
}, "testchan1");
147+
148+
assertEquals(0L, latchReceivedPong.getCount());
149+
assertEquals(0L, latchUnsubscribed.getCount());
150+
assertEquals(Collections.singletonList("hi!"), pongPatterns);
151+
}
152+
115153
@Test
116154
public void pubSubNumPat() {
117155
jedis.psubscribe(new JedisPubSub() {
@@ -301,6 +339,75 @@ public void onPMessage(byte[] pattern, byte[] channel, byte[] message) {
301339
}, SafeEncoder.encode("foo.*"), SafeEncoder.encode("bar.*"));
302340
}
303341

342+
@Test
343+
public void binaryPubSubChannelWithPingPong() throws InterruptedException {
344+
final CountDownLatch latchUnsubscribed = new CountDownLatch(1);
345+
final CountDownLatch latchReceivedPong = new CountDownLatch(1);
346+
347+
jedis.subscribe(new BinaryJedisPubSub() {
348+
349+
@Override
350+
public void onSubscribe(byte[] channel, int subscribedChannels) {
351+
publishOne("testchan1", "hello");
352+
}
353+
354+
@Override
355+
public void onMessage(byte[] channel, byte[] message) {
356+
this.ping();
357+
}
358+
359+
@Override
360+
public void onPong(byte[] pattern) {
361+
latchReceivedPong.countDown();
362+
unsubscribe();
363+
}
364+
365+
@Override
366+
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
367+
latchUnsubscribed.countDown();
368+
}
369+
}, SafeEncoder.encode("testchan1"));
370+
assertEquals(0L, latchReceivedPong.getCount());
371+
assertEquals(0L, latchUnsubscribed.getCount());
372+
}
373+
374+
@Test
375+
public void binaryPubSubChannelWithPingPongWithArgument() throws InterruptedException {
376+
final CountDownLatch latchUnsubscribed = new CountDownLatch(1);
377+
final CountDownLatch latchReceivedPong = new CountDownLatch(1);
378+
final List<byte[]> pongPatterns = new ArrayList<>();
379+
final byte[] pingMessage = SafeEncoder.encode("hi!");
380+
381+
jedis.subscribe(new BinaryJedisPubSub() {
382+
383+
@Override
384+
public void onSubscribe(byte[] channel, int subscribedChannels) {
385+
publishOne("testchan1", "hello");
386+
}
387+
388+
@Override
389+
public void onMessage(byte[] channel, byte[] message) {
390+
this.ping(pingMessage);
391+
}
392+
393+
@Override
394+
public void onPong(byte[] pattern) {
395+
pongPatterns.add(pattern);
396+
latchReceivedPong.countDown();
397+
unsubscribe();
398+
}
399+
400+
@Override
401+
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
402+
latchUnsubscribed.countDown();
403+
}
404+
}, SafeEncoder.encode("testchan1"));
405+
406+
assertEquals(0L, latchReceivedPong.getCount());
407+
assertEquals(0L, latchUnsubscribed.getCount());
408+
assertArrayEquals(pingMessage, pongPatterns.get(0));
409+
}
410+
304411
@Test
305412
public void binarySubscribeLazily() throws UnknownHostException, IOException,
306413
InterruptedException {

0 commit comments

Comments
 (0)