Skip to content

Commit c8088ef

Browse files
garyrussellartembilan
authored andcommitted
GH-2581: Fix OOO Commits with Rebalance
Resolves #2581 If a rebalance occurred while pending out of order commits were present, the pending commits were correctly purged, however, the `consumerPaused` boolean remained true. This prevented the consumer from being paused again while the next batch of records is being processed and subsequent polls returned more records. Reset the `consumerPaused` boolean when all pending commits are purged. Also, with a cooperative assignor, ensure that any newly assigned partitions are paused if pending commits are present. Add tests to verify the correct behavior with a legacy and cooperative assignor. Also tested with the reporter's reproducer. **cherry-pick to 2.9.x** * Fix unnecessary if test. * Fix for if with `ObjectUtils.isEmpty()` # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
1 parent b17a045 commit c8088ef

File tree

2 files changed

+244
-63
lines changed

2 files changed

+244
-63
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 69 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Arrays;
2424
import java.util.Collection;
2525
import java.util.Collections;
26+
import java.util.Comparator;
2627
import java.util.HashMap;
2728
import java.util.HashSet;
2829
import java.util.Iterator;
@@ -122,6 +123,7 @@
122123
import org.springframework.util.Assert;
123124
import org.springframework.util.ClassUtils;
124125
import org.springframework.util.CollectionUtils;
126+
import org.springframework.util.ObjectUtils;
125127
import org.springframework.util.StringUtils;
126128
import org.springframework.util.concurrent.ListenableFuture;
127129
import org.springframework.util.concurrent.ListenableFutureCallback;
@@ -1977,38 +1979,37 @@ private void processAcks(ConsumerRecords<K, V> records) {
19771979
}
19781980
}
19791981

1980-
@SuppressWarnings(DEPRECATION)
1981-
private synchronized void ackInOrder(ConsumerRecord<K, V> record) {
1982-
TopicPartition part = new TopicPartition(record.topic(), record.partition());
1982+
private synchronized void ackInOrder(ConsumerRecord<K, V> cRecord) {
1983+
TopicPartition part = new TopicPartition(cRecord.topic(), cRecord.partition());
19831984
List<Long> offs = this.offsetsInThisBatch.get(part);
1984-
List<ConsumerRecord<K, V>> deferred = this.deferredOffsets.get(part);
1985-
if (offs.size() > 0) {
1986-
if (offs.get(0) == record.offset()) {
1985+
if (!ObjectUtils.isEmpty(offs)) {
1986+
List<ConsumerRecord<K, V>> deferred = this.deferredOffsets.get(part);
1987+
if (offs.get(0) == cRecord.offset()) {
19871988
offs.remove(0);
1988-
ConsumerRecord<K, V> recordToAck = record;
1989-
if (deferred.size() > 0) {
1990-
Collections.sort(deferred, (a, b) -> Long.compare(a.offset(), b.offset()));
1991-
while (deferred.size() > 0 && deferred.get(0).offset() == recordToAck.offset() + 1) {
1989+
ConsumerRecord<K, V> recordToAck = cRecord;
1990+
if (!deferred.isEmpty()) {
1991+
deferred.sort(Comparator.comparingLong(ConsumerRecord::offset));
1992+
while (!ObjectUtils.isEmpty(deferred) && deferred.get(0).offset() == recordToAck.offset() + 1) {
19921993
recordToAck = deferred.remove(0);
19931994
offs.remove(0);
19941995
}
19951996
}
19961997
processAck(recordToAck);
1997-
if (offs.size() == 0) {
1998+
if (offs.isEmpty()) {
19981999
this.deferredOffsets.remove(part);
19992000
this.offsetsInThisBatch.remove(part);
20002001
}
20012002
}
2002-
else if (record.offset() < offs.get(0)) {
2003+
else if (cRecord.offset() < offs.get(0)) {
20032004
throw new IllegalStateException("First remaining offset for this batch is " + offs.get(0)
2004-
+ "; you are acknowledging a stale record: " + ListenerUtils.recordToString(record));
2005+
+ "; you are acknowledging a stale record: " + KafkaUtils.format(cRecord));
20052006
}
20062007
else {
2007-
deferred.add(record);
2008+
deferred.add(cRecord);
20082009
}
20092010
}
20102011
else {
2011-
throw new IllegalStateException("Unexpected ack for " + ListenerUtils.recordToString(record)
2012+
throw new IllegalStateException("Unexpected ack for " + KafkaUtils.format(cRecord)
20122013
+ "; offsets list is empty");
20132014
}
20142015
}
@@ -3371,12 +3372,12 @@ private final class ConsumerBatchAcknowledgment implements Acknowledgment {
33713372
@Override
33723373
public void acknowledge() {
33733374
Map<TopicPartition, List<Long>> offs = ListenerConsumer.this.offsetsInThisBatch;
3374-
Map<TopicPartition, List<ConsumerRecord<K, V>>> deferred = ListenerConsumer.this.deferredOffsets;
33753375
if (!this.acked) {
3376-
for (ConsumerRecord<K, V> record : getHighestOffsetRecords(this.records)) {
3376+
for (ConsumerRecord<K, V> cRecord : getHighestOffsetRecords(this.records)) {
33773377
if (offs != null) {
3378-
offs.remove(new TopicPartition(record.topic(), record.partition()));
3379-
deferred.remove(new TopicPartition(record.topic(), record.partition()));
3378+
TopicPartition partitionToRemove = new TopicPartition(cRecord.topic(), cRecord.partition());
3379+
offs.remove(partitionToRemove);
3380+
ListenerConsumer.this.deferredOffsets.remove(partitionToRemove);
33803381
}
33813382
}
33823383
processAcks(this.records);
@@ -3435,50 +3436,47 @@ private class ListenerConsumerRebalanceListener implements ConsumerRebalanceList
34353436

34363437
@Override
34373438
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
3439+
this.revoked.addAll(partitions);
3440+
removeRevocationsFromPending(partitions);
3441+
if (this.consumerAwareListener != null) {
3442+
this.consumerAwareListener.onPartitionsRevokedBeforeCommit(ListenerConsumer.this.consumer,
3443+
partitions);
3444+
}
3445+
else {
3446+
this.userListener.onPartitionsRevoked(partitions);
3447+
}
34383448
try {
3439-
this.revoked.addAll(partitions);
3440-
removeRevocationsFromPending(partitions);
3441-
if (this.consumerAwareListener != null) {
3442-
this.consumerAwareListener.onPartitionsRevokedBeforeCommit(ListenerConsumer.this.consumer,
3443-
partitions);
3444-
}
3445-
else {
3446-
this.userListener.onPartitionsRevoked(partitions);
3447-
}
3448-
try {
3449-
// Wait until now to commit, in case the user listener added acks
3450-
checkRebalanceCommits();
3451-
commitPendingAcks();
3452-
fixTxOffsetsIfNeeded();
3453-
}
3454-
catch (Exception e) {
3455-
ListenerConsumer.this.logger.error(e, () -> "Fatal commit error after revocation "
3456-
+ partitions);
3457-
}
3458-
if (this.consumerAwareListener != null) {
3459-
this.consumerAwareListener.onPartitionsRevokedAfterCommit(ListenerConsumer.this.consumer,
3460-
partitions);
3461-
}
3462-
if (ListenerConsumer.this.consumerSeekAwareListener != null) {
3463-
ListenerConsumer.this.consumerSeekAwareListener.onPartitionsRevoked(partitions);
3464-
}
3465-
if (ListenerConsumer.this.assignedPartitions != null) {
3466-
ListenerConsumer.this.assignedPartitions.removeAll(partitions);
3467-
}
3468-
ListenerConsumer.this.pausedForNack.removeAll(partitions);
3469-
partitions.forEach(tp -> ListenerConsumer.this.lastCommits.remove(tp));
3470-
synchronized (ListenerConsumer.this) {
3471-
if (ListenerConsumer.this.offsetsInThisBatch != null) {
3472-
partitions.forEach(tp -> {
3473-
ListenerConsumer.this.offsetsInThisBatch.remove(tp);
3474-
ListenerConsumer.this.deferredOffsets.remove(tp);
3475-
});
3476-
}
3477-
}
3449+
// Wait until now to commit, in case the user listener added acks
3450+
checkRebalanceCommits();
3451+
commitPendingAcks();
3452+
fixTxOffsetsIfNeeded();
34783453
}
3479-
finally {
3480-
if (ListenerConsumer.this.kafkaTxManager != null) {
3481-
closeProducers(partitions);
3454+
catch (Exception e) {
3455+
ListenerConsumer.this.logger.error(e, () -> "Fatal commit error after revocation "
3456+
+ partitions);
3457+
}
3458+
if (this.consumerAwareListener != null) {
3459+
this.consumerAwareListener.onPartitionsRevokedAfterCommit(ListenerConsumer.this.consumer,
3460+
partitions);
3461+
}
3462+
if (ListenerConsumer.this.consumerSeekAwareListener != null) {
3463+
ListenerConsumer.this.consumerSeekAwareListener.onPartitionsRevoked(partitions);
3464+
}
3465+
if (ListenerConsumer.this.assignedPartitions != null) {
3466+
ListenerConsumer.this.assignedPartitions.removeAll(partitions);
3467+
}
3468+
ListenerConsumer.this.pausedForNack.removeAll(partitions);
3469+
partitions.forEach(ListenerConsumer.this.lastCommits::remove);
3470+
synchronized (ListenerConsumer.this) {
3471+
Map<TopicPartition, List<Long>> pendingOffsets = ListenerConsumer.this.offsetsInThisBatch;
3472+
if (pendingOffsets != null) {
3473+
partitions.forEach(tp -> {
3474+
pendingOffsets.remove(tp);
3475+
ListenerConsumer.this.deferredOffsets.remove(tp);
3476+
});
3477+
if (pendingOffsets.isEmpty()) {
3478+
ListenerConsumer.this.consumerPaused = false;
3479+
}
34823480
}
34833481
}
34843482
}
@@ -3531,7 +3529,16 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
35313529
}
35323530

35333531
private void repauseIfNeeded(Collection<TopicPartition> partitions) {
3534-
if (isPaused() || ListenerConsumer.this.remainingRecords != null && !partitions.isEmpty()) {
3532+
boolean pending = false;
3533+
synchronized (ListenerConsumer.this) {
3534+
Map<TopicPartition, List<Long>> pendingOffsets = ListenerConsumer.this.offsetsInThisBatch;
3535+
if (!ObjectUtils.isEmpty(pendingOffsets)) {
3536+
pending = true;
3537+
}
3538+
}
3539+
if ((pending || isPaused() || ListenerConsumer.this.remainingRecords != null)
3540+
&& !partitions.isEmpty()) {
3541+
35353542
ListenerConsumer.this.consumer.pause(partitions);
35363543
ListenerConsumer.this.consumerPaused = true;
35373544
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 175 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@
6666
import org.springframework.kafka.event.ConsumerStartedEvent;
6767
import org.springframework.kafka.event.ConsumerStartingEvent;
6868
import org.springframework.kafka.event.ListenerContainerIdleEvent;
69+
import org.springframework.kafka.listener.ContainerProperties.AckMode;
6970
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
71+
import org.springframework.kafka.support.Acknowledgment;
7072
import org.springframework.kafka.test.utils.KafkaTestUtils;
7173
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
7274
import org.springframework.lang.Nullable;
@@ -894,7 +896,7 @@ void removeFromPartitionPauseRequestedWhenNotAssigned() throws InterruptedExcept
894896

895897
@SuppressWarnings({ "unchecked", "rawtypes" })
896898
@Test
897-
void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseLagacyAssignor() throws InterruptedException {
899+
void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseLegacyAssignor() throws InterruptedException {
898900
TopicPartition tp0 = new TopicPartition("foo", 0);
899901
TopicPartition tp1 = new TopicPartition("foo", 1);
900902
TopicPartition tp2 = new TopicPartition("foo", 2);
@@ -1106,6 +1108,178 @@ public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record,
11061108
assertThat(recordsDelivered.get(2)).isEqualTo(record1);
11071109
}
11081110

1111+
@SuppressWarnings({ "unchecked", "rawtypes" })
1112+
@Test
1113+
void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws InterruptedException {
1114+
TopicPartition tp0 = new TopicPartition("foo", 0);
1115+
TopicPartition tp1 = new TopicPartition("foo", 1);
1116+
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1);
1117+
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new HashMap<>();
1118+
allRecordMap.put(tp0,
1119+
List.of(new ConsumerRecord("foo", 0, 0, null, "bar"), new ConsumerRecord("foo", 0, 1, null, "bar")));
1120+
allRecordMap.put(tp1,
1121+
List.of(new ConsumerRecord("foo", 1, 0, null, "bar"), new ConsumerRecord("foo", 1, 1, null, "bar")));
1122+
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
1123+
List<TopicPartition> afterRevokeAssignments = Arrays.asList(tp1);
1124+
AtomicInteger pollPhase = new AtomicInteger();
1125+
1126+
Consumer consumer = mock(Consumer.class);
1127+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
1128+
CountDownLatch subscribeLatch = new CountDownLatch(1);
1129+
willAnswer(invocation -> {
1130+
rebal.set(invocation.getArgument(1));
1131+
subscribeLatch.countDown();
1132+
return null;
1133+
}).given(consumer).subscribe(any(Collection.class), any());
1134+
CountDownLatch pauseLatch = new CountDownLatch(1);
1135+
AtomicBoolean paused = new AtomicBoolean();
1136+
willAnswer(inv -> {
1137+
paused.set(true);
1138+
pauseLatch.countDown();
1139+
return null;
1140+
}).given(consumer).pause(any());
1141+
ConsumerFactory cf = mock(ConsumerFactory.class);
1142+
given(cf.createConsumer(any(), any(), any(), any())).willReturn(consumer);
1143+
given(cf.getConfigurationProperties())
1144+
.willReturn(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
1145+
ContainerProperties containerProperties = new ContainerProperties("foo");
1146+
containerProperties.setGroupId("grp");
1147+
containerProperties.setAckMode(AckMode.MANUAL);
1148+
containerProperties.setMessageListener(ackOffset1());
1149+
containerProperties.setAsyncAcks(true);
1150+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(cf,
1151+
containerProperties);
1152+
CountDownLatch pollLatch = new CountDownLatch(2);
1153+
CountDownLatch rebalLatch = new CountDownLatch(1);
1154+
CountDownLatch continueLatch = new CountDownLatch(1);
1155+
willAnswer(inv -> {
1156+
Thread.sleep(50);
1157+
pollLatch.countDown();
1158+
switch (pollPhase.getAndIncrement()) {
1159+
case 0:
1160+
rebal.get().onPartitionsAssigned(allAssignments);
1161+
return allRecords;
1162+
case 1:
1163+
rebal.get().onPartitionsRevoked(allAssignments);
1164+
rebal.get().onPartitionsAssigned(afterRevokeAssignments);
1165+
rebalLatch.countDown();
1166+
continueLatch.await(10, TimeUnit.SECONDS);
1167+
default:
1168+
return ConsumerRecords.empty();
1169+
}
1170+
}).given(consumer).poll(any());
1171+
container.start();
1172+
assertThat(subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue();
1173+
KafkaMessageListenerContainer child = (KafkaMessageListenerContainer) KafkaTestUtils
1174+
.getPropertyValue(container, "containers", List.class).get(0);
1175+
assertThat(pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
1176+
assertThat(pauseLatch.await(10, TimeUnit.SECONDS)).isTrue();
1177+
assertThat(rebalLatch.await(10, TimeUnit.SECONDS)).isTrue();
1178+
Map offsets = KafkaTestUtils.getPropertyValue(child, "listenerConsumer.offsetsInThisBatch", Map.class);
1179+
assertThat(offsets).hasSize(0);
1180+
assertThat(KafkaTestUtils.getPropertyValue(child, "listenerConsumer.consumerPaused", Boolean.class)).isFalse();
1181+
continueLatch.countDown();
1182+
// no pause when re-assigned because all revoked
1183+
verify(consumer).pause(any());
1184+
verify(consumer, never()).resume(any());
1185+
container.stop();
1186+
}
1187+
1188+
@SuppressWarnings({ "unchecked", "rawtypes" })
1189+
@Test
1190+
void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws InterruptedException {
1191+
TopicPartition tp0 = new TopicPartition("foo", 0);
1192+
TopicPartition tp1 = new TopicPartition("foo", 1);
1193+
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1);
1194+
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new HashMap<>();
1195+
allRecordMap.put(tp0,
1196+
List.of(new ConsumerRecord("foo", 0, 0, null, "bar"), new ConsumerRecord("foo", 0, 1, null, "bar")));
1197+
allRecordMap.put(tp1,
1198+
List.of(new ConsumerRecord("foo", 1, 0, null, "bar"), new ConsumerRecord("foo", 1, 1, null, "bar")));
1199+
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
1200+
List<TopicPartition> afterRevokeAssignments = Arrays.asList(tp1);
1201+
AtomicInteger pollPhase = new AtomicInteger();
1202+
1203+
Consumer consumer = mock(Consumer.class);
1204+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
1205+
CountDownLatch subscribeLatch = new CountDownLatch(1);
1206+
willAnswer(invocation -> {
1207+
rebal.set(invocation.getArgument(1));
1208+
subscribeLatch.countDown();
1209+
return null;
1210+
}).given(consumer).subscribe(any(Collection.class), any());
1211+
CountDownLatch pauseLatch = new CountDownLatch(1);
1212+
AtomicBoolean paused = new AtomicBoolean();
1213+
willAnswer(inv -> {
1214+
paused.set(true);
1215+
pauseLatch.countDown();
1216+
return null;
1217+
}).given(consumer).pause(any());
1218+
ConsumerFactory cf = mock(ConsumerFactory.class);
1219+
given(cf.createConsumer(any(), any(), any(), any())).willReturn(consumer);
1220+
given(cf.getConfigurationProperties())
1221+
.willReturn(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
1222+
ContainerProperties containerProperties = new ContainerProperties("foo");
1223+
containerProperties.setGroupId("grp");
1224+
containerProperties.setAckMode(AckMode.MANUAL);
1225+
containerProperties.setMessageListener(ackOffset1());
1226+
containerProperties.setAsyncAcks(true);
1227+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(cf,
1228+
containerProperties);
1229+
CountDownLatch pollLatch = new CountDownLatch(2);
1230+
CountDownLatch rebalLatch = new CountDownLatch(1);
1231+
CountDownLatch continueLatch = new CountDownLatch(1);
1232+
willAnswer(inv -> {
1233+
Thread.sleep(50);
1234+
pollLatch.countDown();
1235+
switch (pollPhase.getAndIncrement()) {
1236+
case 0:
1237+
rebal.get().onPartitionsAssigned(allAssignments);
1238+
return allRecords;
1239+
case 1:
1240+
rebal.get().onPartitionsRevoked(List.of(tp0));
1241+
rebal.get().onPartitionsAssigned(List.of(new TopicPartition("foo", 2)));
1242+
rebalLatch.countDown();
1243+
continueLatch.await(10, TimeUnit.SECONDS);
1244+
default:
1245+
return ConsumerRecords.empty();
1246+
}
1247+
}).given(consumer).poll(any());
1248+
container.start();
1249+
assertThat(subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue();
1250+
KafkaMessageListenerContainer child = (KafkaMessageListenerContainer) KafkaTestUtils
1251+
.getPropertyValue(container, "containers", List.class).get(0);
1252+
assertThat(pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
1253+
assertThat(pauseLatch.await(10, TimeUnit.SECONDS)).isTrue();
1254+
assertThat(rebalLatch.await(10, TimeUnit.SECONDS)).isTrue();
1255+
Map offsets = KafkaTestUtils.getPropertyValue(child, "listenerConsumer.offsetsInThisBatch", Map.class);
1256+
assertThat(offsets).hasSize(1);
1257+
assertThat(offsets.get(tp1)).isNotNull();
1258+
assertThat(KafkaTestUtils.getPropertyValue(child, "listenerConsumer.consumerPaused", Boolean.class)).isTrue();
1259+
continueLatch.countDown();
1260+
verify(consumer, times(2)).pause(any());
1261+
verify(consumer, never()).resume(any());
1262+
container.stop();
1263+
}
1264+
1265+
@SuppressWarnings("rawtypes")
1266+
private AcknowledgingMessageListener ackOffset1() {
1267+
return new AcknowledgingMessageListener() {
1268+
1269+
@Override
1270+
public void onMessage(ConsumerRecord rec, @Nullable Acknowledgment ack) {
1271+
if (rec.offset() == 1) {
1272+
ack.acknowledge();
1273+
}
1274+
}
1275+
1276+
@Override
1277+
public void onMessage(Object data) {
1278+
}
1279+
1280+
};
1281+
}
1282+
11091283
public static class TestMessageListener1 implements MessageListener<String, String>, ConsumerSeekAware {
11101284

11111285
private static ThreadLocal<ConsumerSeekCallback> callbacks = new ThreadLocal<>();

0 commit comments

Comments
 (0)