Skip to content

Commit 76764b9

Browse files
authored
Fix PENDING entries of xinfoStreamFull method (#2988)
1 parent c63a9b1 commit 76764b9

File tree

4 files changed

+43
-8
lines changed

4 files changed

+43
-8
lines changed

src/main/java/redis/clients/jedis/BuilderFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1315,7 +1315,7 @@ private Map<String, Builder> createDecoderMap() {
13151315
tempMappingFunctions.put(StreamConsumerFullInfo.NAME, STRING);
13161316
tempMappingFunctions.put(StreamConsumerFullInfo.SEEN_TIME, LONG);
13171317
tempMappingFunctions.put(StreamConsumerFullInfo.PEL_COUNT, LONG);
1318-
tempMappingFunctions.put(StreamConsumerFullInfo.PENDING, LONG_LIST);
1318+
tempMappingFunctions.put(StreamConsumerFullInfo.PENDING, ENCODED_OBJECT_LIST);
13191319

13201320
return tempMappingFunctions;
13211321
}
@@ -1354,7 +1354,7 @@ private Map<String, Builder> createDecoderMap() {
13541354
Map<String, Builder> tempMappingFunctions = new HashMap<>();
13551355
tempMappingFunctions.put(StreamGroupFullInfo.NAME, STRING);
13561356
tempMappingFunctions.put(StreamGroupFullInfo.CONSUMERS, STREAM_CONSUMER_FULL_INFO_LIST);
1357-
tempMappingFunctions.put(StreamGroupFullInfo.PENDING, STRING_LIST);
1357+
tempMappingFunctions.put(StreamGroupFullInfo.PENDING, ENCODED_OBJECT_LIST);
13581358
tempMappingFunctions.put(StreamGroupFullInfo.LAST_DELIVERED, STREAM_ENTRY_ID);
13591359
tempMappingFunctions.put(StreamGroupFullInfo.PEL_COUNT, LONG);
13601360

src/main/java/redis/clients/jedis/resps/StreamConsumerFullInfo.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.io.Serializable;
44
import java.util.List;
55
import java.util.Map;
6+
import redis.clients.jedis.StreamEntryID;
67

78
/**
89
* This class holds information about a stream consumer with command <code>xinfo stream mystream full<code/>.
@@ -19,16 +20,18 @@ public class StreamConsumerFullInfo implements Serializable {
1920
private final String name;
2021
private final Long seenTime;
2122
private final Long pelCount;
22-
private final List<Long> pending;
23+
private final List<List<Object>> pending;
2324
private final Map<String, Object> consumerInfo;
2425

2526
@SuppressWarnings("unchecked")
2627
public StreamConsumerFullInfo(Map<String, Object> map) {
2728
consumerInfo = map;
2829
name = (String) map.get(NAME);
2930
seenTime = (Long) map.get(SEEN_TIME);
30-
pending = (List<Long>) map.get(PENDING);
31+
pending = (List<List<Object>>) map.get(PENDING);
3132
pelCount = (Long) map.get(PEL_COUNT);
33+
34+
pending.stream().forEach(entry -> entry.set(0, new StreamEntryID((String) entry.get(0))));
3235
}
3336

3437
public String getName() {
@@ -43,7 +46,7 @@ public Long getPelCount() {
4346
return pelCount;
4447
}
4548

46-
public List<Long> getPending() {
49+
public List<List<Object>> getPending() {
4750
return pending;
4851
}
4952

src/main/java/redis/clients/jedis/resps/StreamGroupFullInfo.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class StreamGroupFullInfo implements Serializable {
2121

2222
private final String name;
2323
private final List<StreamConsumerFullInfo> consumers;
24-
private final List<String> pending;
24+
private final List<List<Object>> pending;
2525
private final Long pelCount;
2626
private final StreamEntryID lastDeliveredId;
2727
private final Map<String, Object> groupFullInfo;
@@ -35,10 +35,11 @@ public StreamGroupFullInfo(Map<String, Object> map) {
3535
groupFullInfo = map;
3636
name = (String) map.get(NAME);
3737
consumers = (List<StreamConsumerFullInfo>) map.get(CONSUMERS);
38-
pending = (List<String>) map.get(PENDING);
38+
pending = (List<List<Object>>) map.get(PENDING);
3939
lastDeliveredId = (StreamEntryID) map.get(LAST_DELIVERED);
4040
pelCount = (Long) map.get(PEL_COUNT);
4141

42+
pending.stream().forEach(entry -> entry.set(0, new StreamEntryID((String) entry.get(0))));
4243
}
4344

4445
public String getName() {
@@ -49,7 +50,7 @@ public List<StreamConsumerFullInfo> getConsumers() {
4950
return consumers;
5051
}
5152

52-
public List<String> getPending() {
53+
public List<List<Object>> getPending() {
5354
return pending;
5455
}
5556

src/test/java/redis/clients/jedis/commands/jedis/StreamsCommandsTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -837,7 +837,38 @@ public void xinfo() throws InterruptedException {
837837
} catch (JedisException e) {
838838
assertEquals("ERR no such key", e.getMessage());
839839
}
840+
}
841+
842+
@Test
843+
public void xinfoStreamFullWithPending() {
844+
845+
Map<String, String> map = singletonMap("f1", "v1");
846+
StreamEntryID id1 = jedis.xadd("streamfull2", (StreamEntryID) null, map);
847+
StreamEntryID id2 = jedis.xadd("streamfull2", (StreamEntryID) null, map);
848+
jedis.xgroupCreate("streamfull2", "xreadGroup-group", null, false);
849+
850+
Map<String, StreamEntryID> streamQeury1 = singletonMap("streamfull2", StreamEntryID.UNRECEIVED_ENTRY);
851+
List<Entry<String, List<StreamEntry>>> range = jedis.xreadGroup("xreadGroup-group", "xreadGroup-consumer",
852+
XReadGroupParams.xReadGroupParams().count(1), streamQeury1);
853+
assertEquals(1, range.size());
854+
assertEquals(1, range.get(0).getValue().size());
840855

856+
StreamFullInfo full = jedis.xinfoStreamFull("streamfull2");
857+
assertEquals(1, full.getGroups().size());
858+
StreamGroupFullInfo group = full.getGroups().get(0);
859+
assertEquals("xreadGroup-group", group.getName());
860+
861+
assertEquals(1, group.getPending().size());
862+
List<Object> groupPendingEntry = group.getPending().get(0);
863+
assertEquals(id1, groupPendingEntry.get(0));
864+
assertEquals("xreadGroup-consumer", groupPendingEntry.get(1));
865+
866+
assertEquals(1, group.getConsumers().size());
867+
StreamConsumerFullInfo consumer = group.getConsumers().get(0);
868+
assertEquals("xreadGroup-consumer", consumer.getName());
869+
assertEquals(1, consumer.getPending().size());
870+
List<Object> consumerPendingEntry = consumer.getPending().get(0);
871+
assertEquals(id1, consumerPendingEntry.get(0));
841872
}
842873

843874
@Test

0 commit comments

Comments
 (0)