Skip to content

Commit fecf9fd

Browse files
mikael-carlstedtsobychacko
authored andcommitted
GH-3186: Filter out successful commits from retry
Fixes: #3186 * Filter out successful commits from the retry * Don't retry failed commits due to rebalance if successful commits subsequently supersede them * Modify rebalance test cases to capture the scenario that triggers the bug **Auto-cherry-pick to `3.1.x` & `3.0.x`**
1 parent cc5451d commit fecf9fd

File tree

2 files changed

+43
-14
lines changed

2 files changed

+43
-14
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@
161161
* @author Daniel Gentes
162162
* @author Soby Chacko
163163
* @author Raphael Rösch
164+
* @author Christian Mergenthaler
165+
* @author Mikael Carlstedt
164166
*/
165167
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
166168
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
@@ -3299,6 +3301,10 @@ private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> commits, int re
32993301
if (this.fixTxOffsets) {
33003302
this.lastCommits.putAll(commits);
33013303
}
3304+
if (!this.commitsDuringRebalance.isEmpty()) {
3305+
// Remove failed commits during last rebalance that are superseded by these commits
3306+
this.commitsDuringRebalance.keySet().removeAll(commits.keySet());
3307+
}
33023308
}
33033309
catch (RetriableCommitFailedException e) {
33043310
if (retries >= this.containerProperties.getCommitRetries()) {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@
140140
* @author Lukasz Kaminski
141141
* @author Ray Chuan Tay
142142
* @author Daniel Gentes
143+
* @author Soby Chacko
144+
* @author Wang Zhiyang
145+
* @author Mikael Carlstedt
143146
*/
144147
@EmbeddedKafka(topics = { KafkaMessageListenerContainerTests.topic1, KafkaMessageListenerContainerTests.topic2,
145148
KafkaMessageListenerContainerTests.topic3, KafkaMessageListenerContainerTests.topic4,
@@ -3474,25 +3477,33 @@ public void testCooperativeRebalance() throws Exception {
34743477

34753478
@Test
34763479
void testCommitRebalanceInProgressBatch() throws Exception {
3477-
testCommitRebalanceInProgressGuts(AckMode.BATCH, 2, commits -> {
3478-
assertThat(commits).hasSize(3);
3480+
testCommitRebalanceInProgressGuts(AckMode.BATCH, 3, commits -> {
3481+
assertThat(commits).hasSize(5);
34793482
assertThat(commits.get(0)).hasSize(2); // assignment
3480-
assertThat(commits.get(1)).hasSize(2); // batch commit
3481-
assertThat(commits.get(2)).hasSize(2); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked
3483+
assertThat(commits.get(1)).hasSize(2); // batch commit which should fail due to rebalance in progress
3484+
assertThat(commits.get(2)).hasSize(2); // commit retry which should fail due to rebalance in progress
3485+
assertThat(commits.get(3)).hasSize(1); // GH-3186: additional batch commit with only one partition which should be successful
3486+
assertThat(commits.get(4)).hasSize(1); // GH-2489: offsets for both uncommitted partition should be re-committed before partition 0 is revoked
3487+
assertThat(commits.get(4).get(new TopicPartition("foo", 0)))
3488+
.isNotNull()
3489+
.extracting(OffsetAndMetadata::offset)
3490+
.isEqualTo(2L);
34823491
});
34833492
}
34843493

34853494
@Test
34863495
void testCommitRebalanceInProgressRecord() throws Exception {
3487-
testCommitRebalanceInProgressGuts(AckMode.RECORD, 5, commits -> {
3488-
assertThat(commits).hasSize(6);
3496+
testCommitRebalanceInProgressGuts(AckMode.RECORD, 6, commits -> {
3497+
assertThat(commits).hasSize(8);
34893498
assertThat(commits.get(0)).hasSize(2); // assignment
3490-
assertThat(commits.get(1)).hasSize(1); // 4 individual commits
3499+
assertThat(commits.get(1)).hasSize(1); // 4 individual commits which should fail due to rebalance in progress
34913500
assertThat(commits.get(2)).hasSize(1);
34923501
assertThat(commits.get(3)).hasSize(1);
34933502
assertThat(commits.get(4)).hasSize(1);
3494-
assertThat(commits.get(5)).hasSize(2); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked
3495-
assertThat(commits.get(5).get(new TopicPartition("foo", 1)))
3503+
assertThat(commits.get(5)).hasSize(2); // commit retry which should fail due to rebalance in progress
3504+
assertThat(commits.get(6)).hasSize(1); // GH-3186: additional commit which should be successful
3505+
assertThat(commits.get(7)).hasSize(1); // GH-2489: offsets for both partition should be re-committed before partition 0 is revoked
3506+
assertThat(commits.get(7).get(new TopicPartition("foo", 0)))
34963507
.isNotNull()
34973508
.extracting(om -> om.offset())
34983509
.isEqualTo(2L);
@@ -3516,25 +3527,37 @@ private void testCommitRebalanceInProgressGuts(AckMode ackMode, int exceptions,
35163527
records.put(new TopicPartition("foo", 1), Arrays.asList(
35173528
new ConsumerRecord<>("foo", 1, 0L, 1, "foo"),
35183529
new ConsumerRecord<>("foo", 1, 1L, 1, "bar")));
3530+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> additionalRecords = Collections.singletonMap(
3531+
new TopicPartition("foo", 1),
3532+
Collections.singletonList(new ConsumerRecord<>("foo", 1, 2L, 1, "foo")));
35193533
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
3534+
ConsumerRecords<Integer, String> additionalConsumerRecords = new ConsumerRecords<>(additionalRecords);
35203535
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
3521-
AtomicBoolean first = new AtomicBoolean(true);
3522-
AtomicInteger rebalance = new AtomicInteger();
3536+
AtomicInteger pollIteration = new AtomicInteger();
35233537
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
3524-
CountDownLatch latch = new CountDownLatch(2);
3538+
CountDownLatch latch = new CountDownLatch(3);
35253539
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
35263540
Thread.sleep(50);
3527-
int call = rebalance.getAndIncrement();
3541+
int call = pollIteration.getAndIncrement();
3542+
final ConsumerRecords<Integer, String> result;
35283543
if (call == 0) {
35293544
rebal.get().onPartitionsRevoked(Collections.emptyList());
35303545
rebal.get().onPartitionsAssigned(records.keySet());
3546+
result = consumerRecords;
35313547
}
35323548
else if (call == 1) {
3549+
result = additionalConsumerRecords;
3550+
}
3551+
else if (call == 2) {
35333552
rebal.get().onPartitionsRevoked(Collections.singletonList(topicPartition0));
35343553
rebal.get().onPartitionsAssigned(Collections.emptyList());
3554+
result = emptyRecords;
3555+
}
3556+
else {
3557+
result = emptyRecords;
35353558
}
35363559
latch.countDown();
3537-
return first.getAndSet(false) ? consumerRecords : emptyRecords;
3560+
return result;
35383561
});
35393562
willAnswer(invoc -> {
35403563
rebal.set(invoc.getArgument(1));

0 commit comments

Comments
 (0)