Skip to content

Commit dc026c7

Browse files
garyrussellartembilan
authored andcommitted
GH-2576: Fix No Seeks After Error Handling
Resolves #2576 When the error handler `seeksAfterError` is false, we keep the unprocessed records in memory (`retainedRecords`), pause the consumer, and attempt redelivery. If a rebalance occurs during this state, the behavior depends on whether a legacy or cooperative partition assignor is used. With a legacy assignor, all partitions are unassigned and usually a subset is reassigned. This causes their positions to be reset to the last committed offset. In this case, all retained records must be cleared, otherwise an emergency stop occurs because records are polled when we don't expect them. With a cooperative assignor, only a subset of the partitions are revoked. Records for these partitions must be pruned from the `retainedRecords` and not redelivered to the listener. - add code to the rebalance listener to prune the revoked paritions from the retained records - re-pause any assigned partitions if retained records are present Add tests for both types of assignor. Also tested with the reporter's reproducer. **cherry-pick to 2.9.x** * Fix typo. # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
1 parent 5e0a7ac commit dc026c7

File tree

2 files changed

+237
-2
lines changed

2 files changed

+237
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2561,7 +2561,7 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
25612561

25622562
private boolean checkImmediatePause(Iterator<ConsumerRecord<K, V>> iterator) {
25632563
if (isPaused() && this.pauseImmediate) {
2564-
Map<TopicPartition, List<ConsumerRecord<K, V>>> remaining = new HashMap<>();
2564+
Map<TopicPartition, List<ConsumerRecord<K, V>>> remaining = new LinkedHashMap<>();
25652565
while (iterator.hasNext()) {
25662566
ConsumerRecord<K, V> next = iterator.next();
25672567
remaining.computeIfAbsent(new TopicPartition(next.topic(), next.partition()),
@@ -3437,6 +3437,7 @@ private class ListenerConsumerRebalanceListener implements ConsumerRebalanceList
34373437
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
34383438
try {
34393439
this.revoked.addAll(partitions);
3440+
removeRevocationsFromPending(partitions);
34403441
if (this.consumerAwareListener != null) {
34413442
this.consumerAwareListener.onPartitionsRevokedBeforeCommit(ListenerConsumer.this.consumer,
34423443
partitions);
@@ -3482,6 +3483,23 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
34823483
}
34833484
}
34843485

3486+
private void removeRevocationsFromPending(Collection<TopicPartition> partitions) {
3487+
ConsumerRecords<K, V> remaining = ListenerConsumer.this.remainingRecords;
3488+
if (remaining != null && !partitions.isEmpty()) {
3489+
Set<TopicPartition> remainingParts = new LinkedHashSet<>(remaining.partitions());
3490+
remainingParts.removeAll(partitions);
3491+
if (!remainingParts.isEmpty()) {
3492+
Map<TopicPartition, List<ConsumerRecord<K, V>>> trimmed = new LinkedHashMap<>();
3493+
remainingParts.forEach(part -> trimmed.computeIfAbsent(part, tp -> remaining.records(tp)));
3494+
ListenerConsumer.this.remainingRecords = new ConsumerRecords<>(trimmed);
3495+
}
3496+
else {
3497+
ListenerConsumer.this.remainingRecords = null;
3498+
}
3499+
ListenerConsumer.this.logger.debug(() -> "Removed " + partitions + " from remaining records");
3500+
}
3501+
}
3502+
34853503
@Override
34863504
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
34873505
repauseIfNeeded(partitions);
@@ -3513,7 +3531,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
35133531
}
35143532

35153533
private void repauseIfNeeded(Collection<TopicPartition> partitions) {
3516-
if (isPaused()) {
3534+
if (isPaused() || ListenerConsumer.this.remainingRecords != null && !partitions.isEmpty()) {
35173535
ListenerConsumer.this.consumer.pause(partitions);
35183536
ListenerConsumer.this.consumerPaused = true;
35193537
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.mockito.ArgumentMatchers.eq;
2323
import static org.mockito.BDDMockito.given;
2424
import static org.mockito.BDDMockito.willAnswer;
25+
import static org.mockito.Mockito.atLeastOnce;
2526
import static org.mockito.Mockito.mock;
2627
import static org.mockito.Mockito.never;
2728
import static org.mockito.Mockito.times;
@@ -33,6 +34,7 @@
3334
import java.util.Collection;
3435
import java.util.Collections;
3536
import java.util.HashMap;
37+
import java.util.LinkedHashMap;
3638
import java.util.List;
3739
import java.util.Map;
3840
import java.util.Set;
@@ -887,6 +889,221 @@ void removeFromPartitionPauseRequestedWhenNotAssigned() throws InterruptedExcept
887889
assertThat(child.isPartitionPauseRequested(tp0)).isFalse();
888890
rebal.get().onPartitionsAssigned(assignments);
889891
verify(consumer, times(2)).pause(any()); // no immediate pause this time
892+
container.stop();
893+
}
894+
895+
@SuppressWarnings({ "unchecked", "rawtypes" })
896+
@Test
897+
void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseLagacyAssignor() throws InterruptedException {
898+
TopicPartition tp0 = new TopicPartition("foo", 0);
899+
TopicPartition tp1 = new TopicPartition("foo", 1);
900+
TopicPartition tp2 = new TopicPartition("foo", 2);
901+
TopicPartition tp3 = new TopicPartition("foo", 3);
902+
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1, tp2, tp3);
903+
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new HashMap<>();
904+
allRecordMap.put(tp0, Collections.singletonList(new ConsumerRecord("foo", 0, 0, null, "bar")));
905+
allRecordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar")));
906+
allRecordMap.put(tp2, Collections.singletonList(new ConsumerRecord("foo", 2, 0, null, "bar")));
907+
allRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar")));
908+
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
909+
List<TopicPartition> afterRevokeAssignments = Arrays.asList(tp1, tp3);
910+
Map<TopicPartition, List<ConsumerRecord<String, String>>> afterRevokeRecordMap = new HashMap<>();
911+
afterRevokeRecordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar")));
912+
afterRevokeRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar")));
913+
ConsumerRecords afterRevokeRecords = new ConsumerRecords<>(afterRevokeRecordMap);
914+
AtomicInteger pollPhase = new AtomicInteger();
915+
916+
Consumer consumer = mock(Consumer.class);
917+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
918+
CountDownLatch subscribeLatch = new CountDownLatch(1);
919+
willAnswer(invocation -> {
920+
rebal.set(invocation.getArgument(1));
921+
subscribeLatch.countDown();
922+
return null;
923+
}).given(consumer).subscribe(any(Collection.class), any());
924+
CountDownLatch pauseLatch = new CountDownLatch(1);
925+
AtomicBoolean paused = new AtomicBoolean();
926+
willAnswer(inv -> {
927+
paused.set(true);
928+
pauseLatch.countDown();
929+
return null;
930+
}).given(consumer).pause(any());
931+
CountDownLatch resumeLatch = new CountDownLatch(1);
932+
willAnswer(inv -> {
933+
paused.set(false);
934+
resumeLatch.countDown();
935+
return null;
936+
}).given(consumer).resume(any());
937+
ConsumerFactory cf = mock(ConsumerFactory.class);
938+
given(cf.createConsumer(any(), any(), any(), any())).willReturn(consumer);
939+
given(cf.getConfigurationProperties())
940+
.willReturn(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
941+
ContainerProperties containerProperties = new ContainerProperties("foo");
942+
containerProperties.setGroupId("grp");
943+
containerProperties.setMessageListener((MessageListener) rec -> {
944+
throw new RuntimeException("test");
945+
});
946+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(cf,
947+
containerProperties);
948+
container.setCommonErrorHandler(new CommonErrorHandler() {
949+
950+
@Override
951+
public boolean seeksAfterHandling() {
952+
return false; // pause and use remainingRecords
953+
}
954+
955+
@Override
956+
public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
957+
MessageListenerContainer container) {
958+
959+
return false; // not handled
960+
}
961+
962+
});
963+
CountDownLatch pollLatch = new CountDownLatch(2);
964+
CountDownLatch rebalLatch = new CountDownLatch(1);
965+
CountDownLatch continueLatch = new CountDownLatch(1);
966+
willAnswer(inv -> {
967+
Thread.sleep(50);
968+
pollLatch.countDown();
969+
switch (pollPhase.getAndIncrement()) {
970+
case 0:
971+
rebal.get().onPartitionsAssigned(allAssignments);
972+
return allRecords;
973+
case 1:
974+
rebal.get().onPartitionsRevoked(allAssignments);
975+
rebal.get().onPartitionsAssigned(afterRevokeAssignments);
976+
rebalLatch.countDown();
977+
continueLatch.await(10, TimeUnit.SECONDS);
978+
default:
979+
if (paused.get()) {
980+
return ConsumerRecords.empty();
981+
}
982+
return afterRevokeRecords;
983+
}
984+
}).given(consumer).poll(any());
985+
container.start();
986+
assertThat(subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue();
987+
KafkaMessageListenerContainer child = (KafkaMessageListenerContainer) KafkaTestUtils
988+
.getPropertyValue(container, "containers", List.class).get(0);
989+
assertThat(pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
990+
assertThat(pauseLatch.await(10, TimeUnit.SECONDS)).isTrue();
991+
assertThat(rebalLatch.await(10, TimeUnit.SECONDS)).isTrue();
992+
ConsumerRecords remaining = KafkaTestUtils.getPropertyValue(child, "listenerConsumer.remainingRecords",
993+
ConsumerRecords.class);
994+
assertThat(remaining).isNull();
995+
continueLatch.countDown();
996+
assertThat(resumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
997+
// no pause when re-assigned because all revoked and remainingRecords == null
998+
verify(consumer).pause(any());
999+
verify(consumer).resume(any());
1000+
container.stop();
1001+
}
1002+
1003+
@SuppressWarnings({ "unchecked", "rawtypes" })
1004+
@Test
1005+
void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssignor() throws InterruptedException {
1006+
TopicPartition tp0 = new TopicPartition("foo", 0);
1007+
TopicPartition tp1 = new TopicPartition("foo", 1);
1008+
TopicPartition tp2 = new TopicPartition("foo", 2);
1009+
TopicPartition tp3 = new TopicPartition("foo", 3);
1010+
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1, tp2, tp3);
1011+
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new LinkedHashMap<>();
1012+
ConsumerRecord record0 = new ConsumerRecord("foo", 0, 0, null, "bar");
1013+
ConsumerRecord record1 = new ConsumerRecord("foo", 1, 0, null, "bar");
1014+
allRecordMap.put(tp0, Collections.singletonList(record0));
1015+
allRecordMap.put(tp1, Collections.singletonList(record1));
1016+
allRecordMap.put(tp2, Collections.singletonList(new ConsumerRecord("foo", 2, 0, null, "bar")));
1017+
allRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar")));
1018+
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
1019+
List<TopicPartition> revokedAssignments = Arrays.asList(tp0, tp2);
1020+
AtomicInteger pollPhase = new AtomicInteger();
1021+
1022+
Consumer consumer = mock(Consumer.class);
1023+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
1024+
CountDownLatch subscribeLatch = new CountDownLatch(1);
1025+
willAnswer(invocation -> {
1026+
rebal.set(invocation.getArgument(1));
1027+
subscribeLatch.countDown();
1028+
return null;
1029+
}).given(consumer).subscribe(any(Collection.class), any());
1030+
CountDownLatch pauseLatch = new CountDownLatch(1);
1031+
AtomicBoolean paused = new AtomicBoolean();
1032+
willAnswer(inv -> {
1033+
paused.set(true);
1034+
pauseLatch.countDown();
1035+
return null;
1036+
}).given(consumer).pause(any());
1037+
ConsumerFactory cf = mock(ConsumerFactory.class);
1038+
given(cf.createConsumer(any(), any(), any(), any())).willReturn(consumer);
1039+
given(cf.getConfigurationProperties())
1040+
.willReturn(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
1041+
ContainerProperties containerProperties = new ContainerProperties("foo");
1042+
containerProperties.setGroupId("grp");
1043+
List<ConsumerRecord> recordsDelivered = new ArrayList<>();
1044+
CountDownLatch consumeLatch = new CountDownLatch(3);
1045+
containerProperties.setMessageListener((MessageListener) rec -> {
1046+
recordsDelivered.add((ConsumerRecord) rec);
1047+
consumeLatch.countDown();
1048+
throw new RuntimeException("test");
1049+
});
1050+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(cf,
1051+
containerProperties);
1052+
container.setCommonErrorHandler(new CommonErrorHandler() {
1053+
1054+
@Override
1055+
public boolean seeksAfterHandling() {
1056+
return false; // pause and use remainingRecords
1057+
}
1058+
1059+
@Override
1060+
public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
1061+
MessageListenerContainer container) {
1062+
1063+
return false; // not handled
1064+
}
1065+
1066+
});
1067+
CountDownLatch pollLatch = new CountDownLatch(2);
1068+
CountDownLatch rebalLatch = new CountDownLatch(1);
1069+
CountDownLatch continueLatch = new CountDownLatch(1);
1070+
willAnswer(inv -> {
1071+
Thread.sleep(50);
1072+
pollLatch.countDown();
1073+
switch (pollPhase.getAndIncrement()) {
1074+
case 0:
1075+
rebal.get().onPartitionsAssigned(allAssignments);
1076+
return allRecords;
1077+
case 1:
1078+
rebal.get().onPartitionsRevoked(revokedAssignments);
1079+
rebal.get().onPartitionsAssigned(Collections.emptyList());
1080+
rebalLatch.countDown();
1081+
continueLatch.await(10, TimeUnit.SECONDS);
1082+
default:
1083+
return ConsumerRecords.empty();
1084+
}
1085+
}).given(consumer).poll(any());
1086+
container.start();
1087+
assertThat(subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue();
1088+
KafkaMessageListenerContainer child = (KafkaMessageListenerContainer) KafkaTestUtils
1089+
.getPropertyValue(container, "containers", List.class).get(0);
1090+
assertThat(pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
1091+
assertThat(pauseLatch.await(10, TimeUnit.SECONDS)).isTrue();
1092+
assertThat(rebalLatch.await(10, TimeUnit.SECONDS)).isTrue();
1093+
ConsumerRecords remaining = KafkaTestUtils.getPropertyValue(child, "listenerConsumer.remainingRecords",
1094+
ConsumerRecords.class);
1095+
assertThat(remaining.count()).isEqualTo(2);
1096+
assertThat(remaining.partitions()).contains(tp1, tp3);
1097+
continueLatch.countDown();
1098+
verify(consumer, atLeastOnce()).pause(any());
1099+
verify(consumer, never()).resume(any());
1100+
assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
1101+
container.stop();
1102+
assertThat(recordsDelivered).hasSizeGreaterThanOrEqualTo(3);
1103+
// partitions 0, 2 revoked during second poll.
1104+
assertThat(recordsDelivered.get(0)).isEqualTo(record0);
1105+
assertThat(recordsDelivered.get(1)).isEqualTo(record1);
1106+
assertThat(recordsDelivered.get(2)).isEqualTo(record1);
8901107
}
8911108

8921109
public static class TestMessageListener1 implements MessageListener<String, String>, ConsumerSeekAware {

0 commit comments

Comments
 (0)