Skip to content

Commit 0814cff

Browse files
truepelegaryrussell
authored andcommitted
GH-2542: FallbackBatchErrorHandler Improvement
Resolves #2542 Fix retry logic causing consumer leaving the group **Short problem description**: consumer leaves the group when combined processing+backoff time is higher than `max.poll.interval.ms`. **Root cause**: The retry logic in `ErrorHandlingUtils` does not call `poll()` (on the paused consumer) before re-trying the listener runnable, it calls `poll()` only before backing off. So if backoffInterval + duration of the following retried execution is longer than `max.poll.interval` - consumer leaves the group. **Solution**: Amend `ErrorHandlingUtils` to have an extra call to `consumer.poll` right before retrying the listener runnable. cleanup fix import style fix import style cleanup
1 parent 6db6ac7 commit 0814cff

File tree

2 files changed

+10
-6
lines changed

2 files changed

+10
-6
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2022 the original author or authors.
2+
* Copyright 2021-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,6 +37,7 @@
3737
* Utilities for error handling.
3838
*
3939
* @author Gary Russell
40+
* @author Andrii Pelesh
4041
* @since 2.8
4142
*
4243
*/
@@ -144,6 +145,7 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
144145
if (!container.isRunning()) {
145146
throw new KafkaException("Container stopped during retries");
146147
}
148+
consumer.poll(Duration.ZERO);
147149
try {
148150
invokeListener.run();
149151
return;

spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -83,7 +83,7 @@ void recover() {
8383
assertThat(this.invoked).isEqualTo(3);
8484
assertThat(recovered).hasSize(2);
8585
verify(consumer).pause(any());
86-
verify(consumer, times(3)).poll(any());
86+
verify(consumer, times(2 * this.invoked)).poll(any());
8787
verify(consumer).resume(any());
8888
verify(consumer, times(2)).assignment();
8989
verifyNoMoreInteractions(consumer);
@@ -110,7 +110,7 @@ void successOnRetry() {
110110
assertThat(this.invoked).isEqualTo(1);
111111
assertThat(recovered).hasSize(0);
112112
verify(consumer).pause(any());
113-
verify(consumer).poll(any());
113+
verify(consumer, times(2)).poll(any());
114114
verify(consumer).resume(any());
115115
verify(consumer, times(2)).assignment();
116116
verifyNoMoreInteractions(consumer);
@@ -142,7 +142,7 @@ void recoveryFails() {
142142
assertThat(this.invoked).isEqualTo(3);
143143
assertThat(recovered).hasSize(1);
144144
verify(consumer).pause(any());
145-
verify(consumer, times(3)).poll(any());
145+
verify(consumer, times(2 * this.invoked)).poll(any());
146146
verify(consumer).resume(any());
147147
verify(consumer, times(2)).assignment();
148148
verify(consumer).seek(new TopicPartition("foo", 0), 0L);
@@ -211,9 +211,11 @@ void rePauseOnRebalance() {
211211
inOrder.verify(container).publishConsumerPausedEvent(map.keySet(), "For batch retry");
212212
inOrder.verify(consumer).poll(any());
213213
inOrder.verify(consumer).pause(any());
214+
inOrder.verify(consumer).poll(any());
215+
inOrder.verify(consumer).pause(any());
214216
inOrder.verify(consumer).resume(any());
215217
inOrder.verify(container).publishConsumerResumedEvent(map.keySet());
216-
verify(consumer, times(3)).assignment();
218+
verify(consumer, times(4)).assignment();
217219
verifyNoMoreInteractions(consumer);
218220
assertThat(pubPauseCalled.get()).isTrue();
219221
}

0 commit comments

Comments
 (0)