Skip to content

Commit 2f52c95

Browse files
Adds support for XPENDING IDLE parameter (#2822)
* Adds support for XPENDING IDLE parameter fixes #2432 * Increase delay to ensure sufficient time for idle * PR feedback - Add release notes - Add overload to StreamPendingMessages/StreamPendingMessagesAsync to preserve back compat --------- Co-authored-by: Marc Gravell <[email protected]>
1 parent 038f3de commit 2f52c95

File tree

10 files changed

+110
-22
lines changed

10 files changed

+110
-22
lines changed

docs/ReleaseNotes.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ Current package versions:
1111
- Add `HGETDEL`, `HGETEX` and `HSETEX` support ([#2863 by atakavci](https://github.com/StackExchange/StackExchange.Redis/pull/2863))
1212
- Fix key-prefix omission in `SetIntersectionLength` and `SortedSet{Combine[WithScores]|IntersectionLength}` ([#2863 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2863))
1313
- Add `Condition.SortedSet[Not]ContainsStarting` condition for transactions ([#2638 by ArnoKoll](https://github.com/StackExchange/StackExchange.Redis/pull/2638))
14-
14+
- Add support for XPENDING Idle time filter ([#2822 by david-brink-talogy](https://github.com/StackExchange/StackExchange.Redis/pull/2822))
15+
-
1516
## 2.8.58
1617

1718
- Fix [#2679](https://github.com/StackExchange/StackExchange.Redis/issues/2679) - blocking call in long-running connects ([#2680 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2680))

src/StackExchange.Redis/Interfaces/IDatabase.cs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2874,6 +2874,21 @@ IEnumerable<SortedSetEntry> SortedSetScan(
28742874
/// <remarks><seealso href="https://redis.io/commands/xpending"/></remarks>
28752875
StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);
28762876

2877+
/// <summary>
2878+
/// View information about each pending message.
2879+
/// </summary>
2880+
/// <param name="key">The key of the stream.</param>
2881+
/// <param name="groupName">The name of the consumer group.</param>
2882+
/// <param name="count">The maximum number of pending messages to return.</param>
2883+
/// <param name="consumerName">The consumer name for the pending messages. Pass RedisValue.Null to include pending messages for all consumers.</param>
2884+
/// <param name="minId">The minimum ID from which to read the stream of pending messages. Pass null to read from the beginning of the stream.</param>
2885+
/// <param name="maxId">The maximum ID to read to within the stream of pending messages. Pass null to read to the end of the stream.</param>
2886+
/// <param name="flags">The flags to use for this operation.</param>
2887+
/// <returns>An instance of <see cref="StreamPendingMessageInfo"/> for each pending message.</returns>
2888+
/// <remarks>Equivalent of calling XPENDING key group start-id end-id count consumer-name.</remarks>
2889+
/// <remarks><seealso href="https://redis.io/commands/xpending"/></remarks>
2890+
StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags);
2891+
28772892
/// <summary>
28782893
/// View information about each pending message.
28792894
/// </summary>
@@ -2883,11 +2898,12 @@ IEnumerable<SortedSetEntry> SortedSetScan(
28832898
/// <param name="consumerName">The consumer name for the pending messages. Pass RedisValue.Null to include pending messages for all consumers.</param>
28842899
/// <param name="minId">The minimum ID from which to read the stream of pending messages. The method will default to reading from the beginning of the stream.</param>
28852900
/// <param name="maxId">The maximum ID to read to within the stream of pending messages. The method will default to reading to the end of the stream.</param>
2901+
/// <param name="minIdleTimeInMs">The minimum idle time threshold for pending messages to be claimed.</param>
28862902
/// <param name="flags">The flags to use for this operation.</param>
28872903
/// <returns>An instance of <see cref="StreamPendingMessageInfo"/> for each pending message.</returns>
2888-
/// <remarks>Equivalent of calling XPENDING key group start-id end-id count consumer-name.</remarks>
2904+
/// <remarks>Equivalent of calling XPENDING key group IDLE min-idle-time start-id end-id count consumer-name.</remarks>
28892905
/// <remarks><seealso href="https://redis.io/commands/xpending"/></remarks>
2890-
StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None);
2906+
StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None);
28912907

28922908
/// <summary>
28932909
/// Read a stream using the given range of IDs.

src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -706,7 +706,10 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
706706
Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None);
707707

708708
/// <inheritdoc cref="IDatabase.StreamPendingMessages(RedisKey, RedisValue, int, RedisValue, RedisValue?, RedisValue?, CommandFlags)"/>
709-
Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None);
709+
Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags);
710+
711+
/// <inheritdoc cref="IDatabase.StreamPendingMessages(RedisKey, RedisValue, int, RedisValue, RedisValue?, RedisValue?, long?, CommandFlags)"/>
712+
Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None);
710713

711714
/// <inheritdoc cref="IDatabase.StreamRange(RedisKey, RedisValue?, RedisValue?, int?, Order, CommandFlags)"/>
712715
Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None);

src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -669,9 +669,12 @@ public Task<bool> StreamDeleteConsumerGroupAsync(RedisKey key, RedisValue groupN
669669
public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) =>
670670
Inner.StreamPendingAsync(ToInner(key), groupName, flags);
671671

672-
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) =>
672+
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags) =>
673673
Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, flags);
674674

675+
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) =>
676+
Inner.StreamPendingMessagesAsync(ToInner(key), groupName, count, consumerName, minId, maxId, minIdleTimeInMs, flags);
677+
675678
public Task<StreamEntry[]> StreamRangeAsync(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) =>
676679
Inner.StreamRangeAsync(ToInner(key), minId, maxId, count, messageOrder, flags);
677680

src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -651,9 +651,12 @@ public bool StreamDeleteConsumerGroup(RedisKey key, RedisValue groupName, Comman
651651
public StreamPendingInfo StreamPending(RedisKey key, RedisValue groupName, CommandFlags flags = CommandFlags.None) =>
652652
Inner.StreamPending(ToInner(key), groupName, flags);
653653

654-
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) =>
654+
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId, RedisValue? maxId, CommandFlags flags) =>
655655
Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, flags);
656656

657+
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None) =>
658+
Inner.StreamPendingMessages(ToInner(key), groupName, count, consumerName, minId, maxId, minIdleTimeInMs, flags);
659+
657660
public StreamEntry[] StreamRange(RedisKey key, RedisValue? minId = null, RedisValue? maxId = null, int? count = null, Order messageOrder = Order.Ascending, CommandFlags flags = CommandFlags.None) =>
658661
Inner.StreamRange(ToInner(key), minId, maxId, count, messageOrder, flags);
659662

src/StackExchange.Redis/PublicAPI/PublicAPI.Shipped.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -739,7 +739,8 @@ StackExchange.Redis.IDatabase.StreamGroupInfo(StackExchange.Redis.RedisKey key,
739739
StackExchange.Redis.IDatabase.StreamInfo(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamInfo
740740
StackExchange.Redis.IDatabase.StreamLength(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
741741
StackExchange.Redis.IDatabase.StreamPending(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingInfo
742-
StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingMessageInfo[]!
742+
StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId, StackExchange.Redis.RedisValue? maxId, StackExchange.Redis.CommandFlags flags) -> StackExchange.Redis.StreamPendingMessageInfo[]!
743+
StackExchange.Redis.IDatabase.StreamPendingMessages(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, long? minIdleTimeInMs = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamPendingMessageInfo[]!
743744
StackExchange.Redis.IDatabase.StreamRange(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]!
744745
StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.StreamEntry[]!
745746
StackExchange.Redis.IDatabase.StreamRead(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> StackExchange.Redis.RedisStream[]!
@@ -983,7 +984,8 @@ StackExchange.Redis.IDatabaseAsync.StreamGroupInfoAsync(StackExchange.Redis.Redi
983984
StackExchange.Redis.IDatabaseAsync.StreamInfoAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamInfo>!
984985
StackExchange.Redis.IDatabaseAsync.StreamLengthAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
985986
StackExchange.Redis.IDatabaseAsync.StreamPendingAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingInfo>!
986-
StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingMessageInfo[]!>!
987+
StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId, StackExchange.Redis.RedisValue? maxId, StackExchange.Redis.CommandFlags flags) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingMessageInfo[]!>!
988+
StackExchange.Redis.IDatabaseAsync.StreamPendingMessagesAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue groupName, int count, StackExchange.Redis.RedisValue consumerName, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, long? minIdleTimeInMs = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamPendingMessageInfo[]!>!
987989
StackExchange.Redis.IDatabaseAsync.StreamRangeAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue? minId = null, StackExchange.Redis.RedisValue? maxId = null, int? count = null, StackExchange.Redis.Order messageOrder = StackExchange.Redis.Order.Ascending, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
988990
StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue position, int? count = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.StreamEntry[]!>!
989991
StackExchange.Redis.IDatabaseAsync.StreamReadAsync(StackExchange.Redis.StreamPosition[]! streamPositions, int? countPerStream = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<StackExchange.Redis.RedisStream[]!>!

src/StackExchange.Redis/RedisDatabase.cs

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3164,7 +3164,10 @@ public Task<StreamPendingInfo> StreamPendingAsync(RedisKey key, RedisValue group
31643164
return ExecuteAsync(msg, ResultProcessor.StreamPendingInfo);
31653165
}
31663166

3167-
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
3167+
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) =>
3168+
StreamPendingMessages(key, groupName, count, consumerName, minId, maxId, null, flags);
3169+
3170+
public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None)
31683171
{
31693172
var msg = GetStreamPendingMessagesMessage(
31703173
key,
@@ -3173,12 +3176,16 @@ public StreamPendingMessageInfo[] StreamPendingMessages(RedisKey key, RedisValue
31733176
maxId,
31743177
count,
31753178
consumerName,
3179+
minIdleTimeInMs,
31763180
flags);
31773181

31783182
return ExecuteSync(msg, ResultProcessor.StreamPendingMessages, defaultValue: Array.Empty<StreamPendingMessageInfo>());
31793183
}
31803184

3181-
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None)
3185+
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, CommandFlags flags = CommandFlags.None) =>
3186+
StreamPendingMessagesAsync(key, groupName, count, consumerName, minId, maxId, null, flags);
3187+
3188+
public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key, RedisValue groupName, int count, RedisValue consumerName, RedisValue? minId = null, RedisValue? maxId = null, long? minIdleTimeInMs = null, CommandFlags flags = CommandFlags.None)
31823189
{
31833190
var msg = GetStreamPendingMessagesMessage(
31843191
key,
@@ -3187,6 +3194,7 @@ public Task<StreamPendingMessageInfo[]> StreamPendingMessagesAsync(RedisKey key,
31873194
maxId,
31883195
count,
31893196
consumerName,
3197+
minIdleTimeInMs,
31903198
flags);
31913199

31923200
return ExecuteAsync(msg, ResultProcessor.StreamPendingMessages, defaultValue: Array.Empty<StreamPendingMessageInfo>());
@@ -4717,9 +4725,9 @@ private Message GetStreamCreateConsumerGroupMessage(RedisKey key, RedisValue gro
47174725
/// Gets a message for <see href="https://redis.io/commands/xpending/"/>.
47184726
/// </summary>
47194727
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
4720-
private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, CommandFlags flags)
4728+
private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupName, RedisValue? minId, RedisValue? maxId, int count, RedisValue consumerName, long? minIdleTimeInMs, CommandFlags flags)
47214729
{
4722-
// > XPENDING mystream mygroup - + 10 [consumer name]
4730+
// > XPENDING mystream mygroup [IDLE min-idle-time] - + 10 [consumer name]
47234731
// 1) 1) 1526569498055 - 0
47244732
// 2) "Bob"
47254733
// 3) (integer)74170458
@@ -4733,16 +4741,33 @@ private Message GetStreamPendingMessagesMessage(RedisKey key, RedisValue groupNa
47334741
throw new ArgumentOutOfRangeException(nameof(count), "count must be greater than 0.");
47344742
}
47354743

4736-
var values = new RedisValue[consumerName == RedisValue.Null ? 4 : 5];
4744+
var valuesLength = 4;
4745+
if (consumerName != RedisValue.Null)
4746+
{
4747+
valuesLength++;
4748+
}
47374749

4738-
values[0] = groupName;
4739-
values[1] = minId ?? StreamConstants.ReadMinValue;
4740-
values[2] = maxId ?? StreamConstants.ReadMaxValue;
4741-
values[3] = count;
4750+
if (minIdleTimeInMs is not null)
4751+
{
4752+
valuesLength += 2;
4753+
}
4754+
var values = new RedisValue[valuesLength];
4755+
4756+
var offset = 0;
4757+
4758+
values[offset++] = groupName;
4759+
if (minIdleTimeInMs is not null)
4760+
{
4761+
values[offset++] = "IDLE";
4762+
values[offset++] = minIdleTimeInMs;
4763+
}
4764+
values[offset++] = minId ?? StreamConstants.ReadMinValue;
4765+
values[offset++] = maxId ?? StreamConstants.ReadMaxValue;
4766+
values[offset++] = count;
47424767

47434768
if (consumerName != RedisValue.Null)
47444769
{
4745-
values[4] = consumerName;
4770+
values[offset++] = consumerName;
47464771
}
47474772

47484773
return Message.Create(

0 commit comments

Comments
 (0)