Skip to content

Commit a7e7bf7

Browse files
GH-3186: Filter out successful commits from retry
Fixes: #3186 * Filter out successful commits from the retry * Don't retry failed commits due to rebalance if successful commits subsequently supersede them * Modify rebalance test cases to capture the scenario that triggers the bug **Auto-cherry-pick to `3.1.x` & `3.0.x`**
1 parent 41c342a commit a7e7bf7

File tree

2 files changed

+40
-14
lines changed

2 files changed

+40
-14
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@
164164
* @author Wang Zhiyang
165165
* @author Raphael Rösch
166166
* @author Christian Mergenthaler
167+
* @author Mikael Carlstedt
167168
*/
168169
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
169170
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
@@ -3227,6 +3228,10 @@ private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> commits, int re
32273228
if (this.fixTxOffsets) {
32283229
this.lastCommits.putAll(commits);
32293230
}
3231+
if (!this.commitsDuringRebalance.isEmpty()) {
3232+
// Remove failed commits during last rebalance that are superseded by these commits
3233+
this.commitsDuringRebalance.keySet().removeAll(commits.keySet());
3234+
}
32303235
}
32313236
catch (RetriableCommitFailedException e) {
32323237
if (retries >= this.containerProperties.getCommitRetries()) {

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@
143143
* @author Daniel Gentes
144144
* @author Soby Chacko
145145
* @author Wang Zhiyang
146+
* @author Mikael Carlstedt
146147
*/
147148
@EmbeddedKafka(topics = { KafkaMessageListenerContainerTests.topic1, KafkaMessageListenerContainerTests.topic2,
148149
KafkaMessageListenerContainerTests.topic3, KafkaMessageListenerContainerTests.topic4,
@@ -3472,25 +3473,33 @@ public void testCooperativeRebalance() throws Exception {
34723473

34733474
@Test
34743475
void testCommitRebalanceInProgressBatch() throws Exception {
3475-
testCommitRebalanceInProgressGuts(AckMode.BATCH, 2, commits -> {
3476-
assertThat(commits).hasSize(3);
3476+
testCommitRebalanceInProgressGuts(AckMode.BATCH, 3, commits -> {
3477+
assertThat(commits).hasSize(5);
34773478
assertThat(commits.get(0)).hasSize(2); // assignment
3478-
assertThat(commits.get(1)).hasSize(2); // batch commit
3479-
assertThat(commits.get(2)).hasSize(2); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked
3479+
assertThat(commits.get(1)).hasSize(2); // batch commit which should fail due to rebalance in progress
3480+
assertThat(commits.get(2)).hasSize(2); // commit retry which should fail due to rebalance in progress
3481+
assertThat(commits.get(3)).hasSize(1); // GH-3186: additional batch commit with only one partition which should be successful
3482+
assertThat(commits.get(4)).hasSize(1); // GH-2489: offsets for both uncommitted partition should be re-committed before partition 0 is revoked
3483+
assertThat(commits.get(4).get(new TopicPartition("foo", 0)))
3484+
.isNotNull()
3485+
.extracting(OffsetAndMetadata::offset)
3486+
.isEqualTo(2L);
34803487
});
34813488
}
34823489

34833490
@Test
34843491
void testCommitRebalanceInProgressRecord() throws Exception {
3485-
testCommitRebalanceInProgressGuts(AckMode.RECORD, 5, commits -> {
3486-
assertThat(commits).hasSize(6);
3492+
testCommitRebalanceInProgressGuts(AckMode.RECORD, 6, commits -> {
3493+
assertThat(commits).hasSize(8);
34873494
assertThat(commits.get(0)).hasSize(2); // assignment
3488-
assertThat(commits.get(1)).hasSize(1); // 4 individual commits
3495+
assertThat(commits.get(1)).hasSize(1); // 4 individual commits which should fail due to rebalance in progress
34893496
assertThat(commits.get(2)).hasSize(1);
34903497
assertThat(commits.get(3)).hasSize(1);
34913498
assertThat(commits.get(4)).hasSize(1);
3492-
assertThat(commits.get(5)).hasSize(2); // GH-2489: offsets for both partition should be re-committed before partition 1 is revoked
3493-
assertThat(commits.get(5).get(new TopicPartition("foo", 1)))
3499+
assertThat(commits.get(5)).hasSize(2); // commit retry which should fail due to rebalance in progress
3500+
assertThat(commits.get(6)).hasSize(1); // GH-3186: additional commit which should be successful
3501+
assertThat(commits.get(7)).hasSize(1); // GH-2489: offsets for both partition should be re-committed before partition 0 is revoked
3502+
assertThat(commits.get(7).get(new TopicPartition("foo", 0)))
34943503
.isNotNull()
34953504
.extracting(OffsetAndMetadata::offset)
34963505
.isEqualTo(2L);
@@ -3514,25 +3523,37 @@ private void testCommitRebalanceInProgressGuts(AckMode ackMode, int exceptions,
35143523
records.put(new TopicPartition("foo", 1), Arrays.asList(
35153524
new ConsumerRecord<>("foo", 1, 0L, 1, "foo"),
35163525
new ConsumerRecord<>("foo", 1, 1L, 1, "bar")));
3526+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> additionalRecords = Collections.singletonMap(
3527+
new TopicPartition("foo", 1),
3528+
Collections.singletonList(new ConsumerRecord<>("foo", 1, 2L, 1, "foo")));
35173529
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
3530+
ConsumerRecords<Integer, String> additionalConsumerRecords = new ConsumerRecords<>(additionalRecords);
35183531
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
3519-
AtomicBoolean first = new AtomicBoolean(true);
3520-
AtomicInteger rebalance = new AtomicInteger();
3532+
AtomicInteger pollIteration = new AtomicInteger();
35213533
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
3522-
CountDownLatch latch = new CountDownLatch(2);
3534+
CountDownLatch latch = new CountDownLatch(3);
35233535
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
35243536
Thread.sleep(50);
3525-
int call = rebalance.getAndIncrement();
3537+
int call = pollIteration.getAndIncrement();
3538+
final ConsumerRecords<Integer, String> result;
35263539
if (call == 0) {
35273540
rebal.get().onPartitionsRevoked(Collections.emptyList());
35283541
rebal.get().onPartitionsAssigned(records.keySet());
3542+
result = consumerRecords;
35293543
}
35303544
else if (call == 1) {
3545+
result = additionalConsumerRecords;
3546+
}
3547+
else if (call == 2) {
35313548
rebal.get().onPartitionsRevoked(Collections.singletonList(topicPartition0));
35323549
rebal.get().onPartitionsAssigned(Collections.emptyList());
3550+
result = emptyRecords;
3551+
}
3552+
else {
3553+
result = emptyRecords;
35333554
}
35343555
latch.countDown();
3535-
return first.getAndSet(false) ? consumerRecords : emptyRecords;
3556+
return result;
35363557
});
35373558
willAnswer(invoc -> {
35383559
rebal.set(invoc.getArgument(1));

0 commit comments

Comments
 (0)