|
18 | 18 | import io.lettuce.core.XAddArgs;
|
19 | 19 | import io.lettuce.core.XClaimArgs;
|
20 | 20 | import io.lettuce.core.XGroupCreateArgs;
|
| 21 | +import io.lettuce.core.XPendingArgs; |
21 | 22 | import io.lettuce.core.XReadArgs;
|
22 | 23 | import io.lettuce.core.XReadArgs.StreamOffset;
|
23 | 24 | import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
|
|
56 | 57 | * @author Tugdual Grall
|
57 | 58 | * @author Dengliming
|
58 | 59 | * @author Mark John Moreno
|
| 60 | + * @author Jeonggyu Choi |
59 | 61 | * @since 2.2
|
60 | 62 | */
|
61 | 63 | class LettuceReactiveStreamCommands implements ReactiveStreamCommands {
|
@@ -235,9 +237,16 @@ public Flux<CommandResponse<PendingRecordsCommand, PendingMessages>> xPending(
|
235 | 237 | io.lettuce.core.Limit limit = command.isLimited() ? io.lettuce.core.Limit.from(command.getCount())
|
236 | 238 | : io.lettuce.core.Limit.unlimited();
|
237 | 239 |
|
238 |
| - Flux<PendingMessage> publisher = command.hasConsumer() ? cmd.xpending(command.getKey(), |
239 |
| - io.lettuce.core.Consumer.from(groupName, ByteUtils.getByteBuffer(command.getConsumerName())), range, limit) |
240 |
| - : cmd.xpending(command.getKey(), groupName, range, limit); |
| 240 | + XPendingArgs<ByteBuffer> xPendingArgs = XPendingArgs.Builder.xpending(groupName, range, limit); |
| 241 | + if (command.hasConsumer()) { |
| 242 | + io.lettuce.core.Consumer<ByteBuffer> consumer = io.lettuce.core.Consumer.from(groupName, ByteUtils.getByteBuffer(command.getConsumerName())); |
| 243 | + xPendingArgs.consumer(consumer); |
| 244 | + } |
| 245 | + if (command.hasIdle()) { |
| 246 | + xPendingArgs.idle(command.getIdle()); |
| 247 | + } |
| 248 | + |
| 249 | + Flux<PendingMessage> publisher = cmd.xpending(command.getKey(), xPendingArgs); |
241 | 250 |
|
242 | 251 | return publisher.collectList().map(it -> {
|
243 | 252 |
|
|
0 commit comments