-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Adds support for XPENDING IDLE parameter #2822
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2803,7 +2803,7 @@ public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue group | |
| return ExecuteAsync(msg, ResultProcessor.StreamPendingInfo); | ||
| } | ||
|
|
||
| public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) | ||
| public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) | ||
| { | ||
| var msg = GetStreamPendingMessagesMessage( | ||
| key, | ||
|
|
@@ -2812,12 +2812,13 @@ public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue | |
| maxId, | ||
| count, | ||
| consumerName, | ||
| minIdleTimeInMs, | ||
| flags); | ||
|
|
||
| return ExecuteSync(msg, ResultProcessor.StreamPendingMessages, defaultValue: Array.Empty<StreamPendingMessageInfo>()); | ||
| } | ||
|
|
||
| public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) | ||
| public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) | ||
| { | ||
| var msg = GetStreamPendingMessagesMessage( | ||
| key, | ||
|
|
@@ -2826,6 +2827,7 @@ public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, | |
| maxId, | ||
| count, | ||
| consumerName, | ||
| minIdleTimeInMs, | ||
| flags); | ||
|
|
||
| return ExecuteAsync(msg, ResultProcessor.StreamPendingMessages, defaultValue: Array.Empty<StreamPendingMessageInfo>()); | ||
|
|
@@ -4300,9 +4302,9 @@ private Message GetStreamCreateConsumerGroupMessage(RedisKey key, RedisValue gro | |
| /// Gets a message for <see href="https://redis.io/commands/xpending/"/>. | ||
| /// </summary> | ||
| /// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks> | ||
| private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, CommandFlags flags) | ||
| private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, long? minIdleTimeInMs, CommandFlags flags) | ||
| { | ||
| // > XPENDING mystream mygroup - + 10 [consumer name] | ||
| // > XPENDING mystream mygroup [IDLE min-idle-time] - + 10 [consumer name] | ||
| // 1) 1) 1526569498055 - 0 | ||
| // 2) "Bob" | ||
| // 3) (integer)74170458 | ||
|
|
@@ -4316,16 +4318,33 @@ private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupNa | |
| throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0."); | ||
| } | ||
|
|
||
| var values = new RedisValue[consumerName == RedisValue.Null ? 4 : 5]; | ||
| var valuesLength = 4; | ||
| if (consumerName != RedisValue.Null) | ||
| { | ||
| valuesLength++; | ||
| } | ||
|
|
||
| values[0] = groupName; | ||
| values[1] = minId ?? StreamConstants.ReadMinValue; | ||
| values[2] = maxId ?? StreamConstants.ReadMaxValue; | ||
| values[3] = count; | ||
| if (minIdleTimeInMs is not null) | ||
| { | ||
| valuesLength += 2; | ||
| } | ||
| var values = new RedisValue[valuesLength]; | ||
|
|
||
| var offset = 0; | ||
|
|
||
| values[offset++] = groupName; | ||
| if (minIdleTimeInMs is not null) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
| { | ||
| values[offset++] = "IDLE"; | ||
| values[offset++] = minIdleTimeInMs; | ||
| } | ||
| values[offset++] = minId ?? StreamConstants.ReadMinValue; | ||
| values[offset++] = maxId ?? StreamConstants.ReadMaxValue; | ||
| values[offset++] = count; | ||
|
|
||
| if (consumerName != RedisValue.Null) | ||
| { | ||
| values[4] = consumerName; | ||
| values[offset++] = consumerName; | ||
| } | ||
|
|
||
| return Message.Create( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1154,8 +1154,8 @@ public void StreamPendingInfoGet() | |
| [Fact] | ||
| public void StreamPendingMessageInfoGet() | ||
| { | ||
| prefixed.StreamPendingMessages("key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None); | ||
| mock.Received().StreamPendingMessages("prefix:key", "group", 10, RedisValue.Null, "-", "+", CommandFlags.None); | ||
| prefixed.StreamPendingMessages("key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not following the concern here and likely showing my lack of knowledge about your setup. 1000 is the new arg, but this seems to be mocked and I'm not following how that could break. What is the "test setup" you're referencing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh, if it is mocked: fine - ignore me |
||
| mock.Received().StreamPendingMessages("prefix:key", "group", 10, RedisValue.Null, "-", "+", 1000, CommandFlags.None); | ||
| } | ||
|
|
||
| [Fact] | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to add an overload here, i.e. a second method with different parameters; otherwise, this is a hard binary break - we try very hard not to do that. If the compiler complains about two methods with optional parameters, we can work around that
(yes: technically adding methods to the interface is also problematic, but: it is problematic in different ways, and in reality we don't expect custom implementations of the
IDatabaseetc APIs)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on this. I'd leave the existing public method signatures as-is, and add new methods with additional required params as needed.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have a suggestion on the overloads? This Redis API can satisfy a number of use cases depending on the parameters passed. minIdleTimeInMs can be used with various combinations of consumerName, minId, and maxId. If new overloads do not have defaults, several overloads would likely be needed to satisfy all use cases which complicates the public API.
With this many arguments I'd generally lean toward an class like StreamPendingMessagesArgs, but that seems to run afoul of the existing design.
The PR as it currently exists seemed like a reasonable compromise between backwards compatibility and adhering to the design.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking something like this (but @mgravell and @NickCraver are really the experts):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mgravell, would you like me to amend the PR to restore the existing overload and add a new one that places minIdleTimeInMs before min/maxId?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to try one last time. @mgravell , @NickCraver, what would you like to see? I'm happy to amend the PR if I can get some direction on the desired arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@philon-msft a problem there is that there are awkward implicit operators that would make that ambiguous; the code as shown is technically a compile-time-break, but: it only applies in rare scenarios, and is in a way that I'm OK with (move the flags) - I'm going to push ahead as presented
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added to that: I'm already doing a "bump", so: good timing