Skip to content

Commit 2e2c6d6

Browse files
authored
GH-2576: Fix No Seeks After Error Handling
Resolves #2576 When the error handler `seeksAfterError` is false, we keep the unprocessed records in memory (`retainedRecords`), pause the consumer, and attempt redelivery. If a rebalance occurs during this state, the behavior depends on whether a legacy or cooperative partition assignor is used. With a legacy assignor, all partitions are unassigned and usually a subset is reassigned. This causes their positions to be reset to the last committed offset. In this case, all retained records must be cleared, otherwise an emergency stop occurs because records are polled when we don't expect them. With a cooperative assignor, only a subset of the partitions are revoked. Records for these partitions must be pruned from the `retainedRecords` and not redelivered to the listener. - add code to the rebalance listener to prune the revoked paritions from the retained records - re-pause any assigned partitions if retained records are present Add tests for both types of assignor. Also tested with the reporter's reproducer. **cherry-pick to 2.9.x** * Fix typo.
1 parent b8cbdbe commit 2e2c6d6

File tree

2 files changed

+237
-2
lines changed

2 files changed

+237
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2643,7 +2643,7 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
26432643

26442644
private boolean checkImmediatePause(Iterator<ConsumerRecord<K, V>> iterator) {
26452645
if (isPaused() && this.pauseImmediate) {
2646-
Map<TopicPartition, List<ConsumerRecord<K, V>>> remaining = new HashMap<>();
2646+
Map<TopicPartition, List<ConsumerRecord<K, V>>> remaining = new LinkedHashMap<>();
26472647
while (iterator.hasNext()) {
26482648
ConsumerRecord<K, V> next = iterator.next();
26492649
remaining.computeIfAbsent(new TopicPartition(next.topic(), next.partition()),
@@ -3498,6 +3498,7 @@ private class ListenerConsumerRebalanceListener implements ConsumerRebalanceList
34983498
@Override
34993499
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
35003500
this.revoked.addAll(partitions);
3501+
removeRevocationsFromPending(partitions);
35013502
if (this.consumerAwareListener != null) {
35023503
this.consumerAwareListener.onPartitionsRevokedBeforeCommit(ListenerConsumer.this.consumer,
35033504
partitions);
@@ -3537,6 +3538,23 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
35373538
}
35383539
}
35393540

3541+
private void removeRevocationsFromPending(Collection<TopicPartition> partitions) {
3542+
ConsumerRecords<K, V> remaining = ListenerConsumer.this.remainingRecords;
3543+
if (remaining != null && !partitions.isEmpty()) {
3544+
Set<TopicPartition> remainingParts = new LinkedHashSet<>(remaining.partitions());
3545+
remainingParts.removeAll(partitions);
3546+
if (!remainingParts.isEmpty()) {
3547+
Map<TopicPartition, List<ConsumerRecord<K, V>>> trimmed = new LinkedHashMap<>();
3548+
remainingParts.forEach(part -> trimmed.computeIfAbsent(part, tp -> remaining.records(tp)));
3549+
ListenerConsumer.this.remainingRecords = new ConsumerRecords<>(trimmed);
3550+
}
3551+
else {
3552+
ListenerConsumer.this.remainingRecords = null;
3553+
}
3554+
ListenerConsumer.this.logger.debug(() -> "Removed " + partitions + " from remaining records");
3555+
}
3556+
}
3557+
35403558
@Override
35413559
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
35423560
repauseIfNeeded(partitions);
@@ -3568,7 +3586,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
35683586
}
35693587

35703588
private void repauseIfNeeded(Collection<TopicPartition> partitions) {
3571-
if (isPaused()) {
3589+
if (isPaused() || ListenerConsumer.this.remainingRecords != null && !partitions.isEmpty()) {
35723590
ListenerConsumer.this.consumer.pause(partitions);
35733591
ListenerConsumer.this.consumerPaused = true;
35743592
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.mockito.ArgumentMatchers.eq;
2323
import static org.mockito.BDDMockito.given;
2424
import static org.mockito.BDDMockito.willAnswer;
25+
import static org.mockito.Mockito.atLeastOnce;
2526
import static org.mockito.Mockito.mock;
2627
import static org.mockito.Mockito.never;
2728
import static org.mockito.Mockito.times;
@@ -33,6 +34,7 @@
3334
import java.util.Collection;
3435
import java.util.Collections;
3536
import java.util.HashMap;
37+
import java.util.LinkedHashMap;
3638
import java.util.List;
3739
import java.util.Map;
3840
import java.util.Set;
@@ -897,6 +899,221 @@ void removeFromPartitionPauseRequestedWhenNotAssigned() throws InterruptedExcept
897899
assertThat(child.isPartitionPauseRequested(tp0)).isFalse();
898900
rebal.get().onPartitionsAssigned(assignments);
899901
verify(consumer, times(2)).pause(any()); // no immediate pause this time
902+
container.stop();
903+
}
904+
905+
@SuppressWarnings({ "unchecked", "rawtypes" })
906+
@Test
907+
void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseLagacyAssignor() throws InterruptedException {
908+
TopicPartition tp0 = new TopicPartition("foo", 0);
909+
TopicPartition tp1 = new TopicPartition("foo", 1);
910+
TopicPartition tp2 = new TopicPartition("foo", 2);
911+
TopicPartition tp3 = new TopicPartition("foo", 3);
912+
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1, tp2, tp3);
913+
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new HashMap<>();
914+
allRecordMap.put(tp0, Collections.singletonList(new ConsumerRecord("foo", 0, 0, null, "bar")));
915+
allRecordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar")));
916+
allRecordMap.put(tp2, Collections.singletonList(new ConsumerRecord("foo", 2, 0, null, "bar")));
917+
allRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar")));
918+
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
919+
List<TopicPartition> afterRevokeAssignments = Arrays.asList(tp1, tp3);
920+
Map<TopicPartition, List<ConsumerRecord<String, String>>> afterRevokeRecordMap = new HashMap<>();
921+
afterRevokeRecordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar")));
922+
afterRevokeRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar")));
923+
ConsumerRecords afterRevokeRecords = new ConsumerRecords<>(afterRevokeRecordMap);
924+
AtomicInteger pollPhase = new AtomicInteger();
925+
926+
Consumer consumer = mock(Consumer.class);
927+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
928+
CountDownLatch subscribeLatch = new CountDownLatch(1);
929+
willAnswer(invocation -> {
930+
rebal.set(invocation.getArgument(1));
931+
subscribeLatch.countDown();
932+
return null;
933+
}).given(consumer).subscribe(any(Collection.class), any());
934+
CountDownLatch pauseLatch = new CountDownLatch(1);
935+
AtomicBoolean paused = new AtomicBoolean();
936+
willAnswer(inv -> {
937+
paused.set(true);
938+
pauseLatch.countDown();
939+
return null;
940+
}).given(consumer).pause(any());
941+
CountDownLatch resumeLatch = new CountDownLatch(1);
942+
willAnswer(inv -> {
943+
paused.set(false);
944+
resumeLatch.countDown();
945+
return null;
946+
}).given(consumer).resume(any());
947+
ConsumerFactory cf = mock(ConsumerFactory.class);
948+
given(cf.createConsumer(any(), any(), any(), any())).willReturn(consumer);
949+
given(cf.getConfigurationProperties())
950+
.willReturn(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
951+
ContainerProperties containerProperties = new ContainerProperties("foo");
952+
containerProperties.setGroupId("grp");
953+
containerProperties.setMessageListener((MessageListener) rec -> {
954+
throw new RuntimeException("test");
955+
});
956+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(cf,
957+
containerProperties);
958+
container.setCommonErrorHandler(new CommonErrorHandler() {
959+
960+
@Override
961+
public boolean seeksAfterHandling() {
962+
return false; // pause and use remainingRecords
963+
}
964+
965+
@Override
966+
public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
967+
MessageListenerContainer container) {
968+
969+
return false; // not handled
970+
}
971+
972+
});
973+
CountDownLatch pollLatch = new CountDownLatch(2);
974+
CountDownLatch rebalLatch = new CountDownLatch(1);
975+
CountDownLatch continueLatch = new CountDownLatch(1);
976+
willAnswer(inv -> {
977+
Thread.sleep(50);
978+
pollLatch.countDown();
979+
switch (pollPhase.getAndIncrement()) {
980+
case 0:
981+
rebal.get().onPartitionsAssigned(allAssignments);
982+
return allRecords;
983+
case 1:
984+
rebal.get().onPartitionsRevoked(allAssignments);
985+
rebal.get().onPartitionsAssigned(afterRevokeAssignments);
986+
rebalLatch.countDown();
987+
continueLatch.await(10, TimeUnit.SECONDS);
988+
default:
989+
if (paused.get()) {
990+
return ConsumerRecords.empty();
991+
}
992+
return afterRevokeRecords;
993+
}
994+
}).given(consumer).poll(any());
995+
container.start();
996+
assertThat(subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue();
997+
KafkaMessageListenerContainer child = (KafkaMessageListenerContainer) KafkaTestUtils
998+
.getPropertyValue(container, "containers", List.class).get(0);
999+
assertThat(pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
1000+
assertThat(pauseLatch.await(10, TimeUnit.SECONDS)).isTrue();
1001+
assertThat(rebalLatch.await(10, TimeUnit.SECONDS)).isTrue();
1002+
ConsumerRecords remaining = KafkaTestUtils.getPropertyValue(child, "listenerConsumer.remainingRecords",
1003+
ConsumerRecords.class);
1004+
assertThat(remaining).isNull();
1005+
continueLatch.countDown();
1006+
assertThat(resumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
1007+
// no pause when re-assigned because all revoked and remainingRecords == null
1008+
verify(consumer).pause(any());
1009+
verify(consumer).resume(any());
1010+
container.stop();
1011+
}
1012+
1013+
@SuppressWarnings({ "unchecked", "rawtypes" })
1014+
@Test
1015+
void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssignor() throws InterruptedException {
1016+
TopicPartition tp0 = new TopicPartition("foo", 0);
1017+
TopicPartition tp1 = new TopicPartition("foo", 1);
1018+
TopicPartition tp2 = new TopicPartition("foo", 2);
1019+
TopicPartition tp3 = new TopicPartition("foo", 3);
1020+
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1, tp2, tp3);
1021+
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new LinkedHashMap<>();
1022+
ConsumerRecord record0 = new ConsumerRecord("foo", 0, 0, null, "bar");
1023+
ConsumerRecord record1 = new ConsumerRecord("foo", 1, 0, null, "bar");
1024+
allRecordMap.put(tp0, Collections.singletonList(record0));
1025+
allRecordMap.put(tp1, Collections.singletonList(record1));
1026+
allRecordMap.put(tp2, Collections.singletonList(new ConsumerRecord("foo", 2, 0, null, "bar")));
1027+
allRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar")));
1028+
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
1029+
List<TopicPartition> revokedAssignments = Arrays.asList(tp0, tp2);
1030+
AtomicInteger pollPhase = new AtomicInteger();
1031+
1032+
Consumer consumer = mock(Consumer.class);
1033+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
1034+
CountDownLatch subscribeLatch = new CountDownLatch(1);
1035+
willAnswer(invocation -> {
1036+
rebal.set(invocation.getArgument(1));
1037+
subscribeLatch.countDown();
1038+
return null;
1039+
}).given(consumer).subscribe(any(Collection.class), any());
1040+
CountDownLatch pauseLatch = new CountDownLatch(1);
1041+
AtomicBoolean paused = new AtomicBoolean();
1042+
willAnswer(inv -> {
1043+
paused.set(true);
1044+
pauseLatch.countDown();
1045+
return null;
1046+
}).given(consumer).pause(any());
1047+
ConsumerFactory cf = mock(ConsumerFactory.class);
1048+
given(cf.createConsumer(any(), any(), any(), any())).willReturn(consumer);
1049+
given(cf.getConfigurationProperties())
1050+
.willReturn(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
1051+
ContainerProperties containerProperties = new ContainerProperties("foo");
1052+
containerProperties.setGroupId("grp");
1053+
List<ConsumerRecord> recordsDelivered = new ArrayList<>();
1054+
CountDownLatch consumeLatch = new CountDownLatch(3);
1055+
containerProperties.setMessageListener((MessageListener) rec -> {
1056+
recordsDelivered.add((ConsumerRecord) rec);
1057+
consumeLatch.countDown();
1058+
throw new RuntimeException("test");
1059+
});
1060+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(cf,
1061+
containerProperties);
1062+
container.setCommonErrorHandler(new CommonErrorHandler() {
1063+
1064+
@Override
1065+
public boolean seeksAfterHandling() {
1066+
return false; // pause and use remainingRecords
1067+
}
1068+
1069+
@Override
1070+
public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record, Consumer<?, ?> consumer,
1071+
MessageListenerContainer container) {
1072+
1073+
return false; // not handled
1074+
}
1075+
1076+
});
1077+
CountDownLatch pollLatch = new CountDownLatch(2);
1078+
CountDownLatch rebalLatch = new CountDownLatch(1);
1079+
CountDownLatch continueLatch = new CountDownLatch(1);
1080+
willAnswer(inv -> {
1081+
Thread.sleep(50);
1082+
pollLatch.countDown();
1083+
switch (pollPhase.getAndIncrement()) {
1084+
case 0:
1085+
rebal.get().onPartitionsAssigned(allAssignments);
1086+
return allRecords;
1087+
case 1:
1088+
rebal.get().onPartitionsRevoked(revokedAssignments);
1089+
rebal.get().onPartitionsAssigned(Collections.emptyList());
1090+
rebalLatch.countDown();
1091+
continueLatch.await(10, TimeUnit.SECONDS);
1092+
default:
1093+
return ConsumerRecords.empty();
1094+
}
1095+
}).given(consumer).poll(any());
1096+
container.start();
1097+
assertThat(subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue();
1098+
KafkaMessageListenerContainer child = (KafkaMessageListenerContainer) KafkaTestUtils
1099+
.getPropertyValue(container, "containers", List.class).get(0);
1100+
assertThat(pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
1101+
assertThat(pauseLatch.await(10, TimeUnit.SECONDS)).isTrue();
1102+
assertThat(rebalLatch.await(10, TimeUnit.SECONDS)).isTrue();
1103+
ConsumerRecords remaining = KafkaTestUtils.getPropertyValue(child, "listenerConsumer.remainingRecords",
1104+
ConsumerRecords.class);
1105+
assertThat(remaining.count()).isEqualTo(2);
1106+
assertThat(remaining.partitions()).contains(tp1, tp3);
1107+
continueLatch.countDown();
1108+
verify(consumer, atLeastOnce()).pause(any());
1109+
verify(consumer, never()).resume(any());
1110+
assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
1111+
container.stop();
1112+
assertThat(recordsDelivered).hasSizeGreaterThanOrEqualTo(3);
1113+
// partitions 0, 2 revoked during second poll.
1114+
assertThat(recordsDelivered.get(0)).isEqualTo(record0);
1115+
assertThat(recordsDelivered.get(1)).isEqualTo(record1);
1116+
assertThat(recordsDelivered.get(2)).isEqualTo(record1);
9001117
}
9011118

9021119
public static class TestMessageListener1 implements MessageListener<String, String>, ConsumerSeekAware {

0 commit comments

Comments
 (0)