Skip to content

Commit 21e2cf7

Browse files
authored
GH-2581: Fix OOO Commits with Rebalance
Resolves #2581 If a rebalance occurred while pending out of order commits were present, the pending commits were correctly purged, however, the `consumerPaused` boolean remained true. This prevented the consumer from being paused again while the next batch of records is being processed and subsequent polls returned more records. Reset the `consumerPaused` boolean when all pending commits are purged. Also, with a cooperative assignor, ensure that any newly assigned partitions are paused if pending commits are present. Add tests to verify the correct behavior with a legacy and cooperative assignor. Also tested with the reporter's reproducer. **cherry-pick to 2.9.x** * Fix unnecessary if test. * Fix for if with `ObjectUtils.isEmpty()`
1 parent 5e857bd commit 21e2cf7

File tree

2 files changed

+196
-8
lines changed

2 files changed

+196
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
import org.springframework.util.Assert;
131131
import org.springframework.util.ClassUtils;
132132
import org.springframework.util.CollectionUtils;
133+
import org.springframework.util.ObjectUtils;
133134
import org.springframework.util.StringUtils;
134135

135136
import io.micrometer.observation.Observation;
@@ -2085,14 +2086,14 @@ private void processAcks(ConsumerRecords<K, V> records) {
20852086
private synchronized void ackInOrder(ConsumerRecord<K, V> cRecord) {
20862087
TopicPartition part = new TopicPartition(cRecord.topic(), cRecord.partition());
20872088
List<Long> offs = this.offsetsInThisBatch.get(part);
2088-
List<ConsumerRecord<K, V>> deferred = this.deferredOffsets.get(part);
2089-
if (!offs.isEmpty()) {
2089+
if (!ObjectUtils.isEmpty(offs)) {
2090+
List<ConsumerRecord<K, V>> deferred = this.deferredOffsets.get(part);
20902091
if (offs.get(0) == cRecord.offset()) {
20912092
offs.remove(0);
20922093
ConsumerRecord<K, V> recordToAck = cRecord;
20932094
if (!deferred.isEmpty()) {
20942095
Collections.sort(deferred, (a, b) -> Long.compare(a.offset(), b.offset()));
2095-
while (!deferred.isEmpty() && deferred.get(0).offset() == recordToAck.offset() + 1) {
2096+
while (!ObjectUtils.isEmpty(deferred) && deferred.get(0).offset() == recordToAck.offset() + 1) {
20962097
recordToAck = deferred.remove(0);
20972098
offs.remove(0);
20982099
}
@@ -3434,8 +3435,8 @@ private final class ConsumerBatchAcknowledgment implements Acknowledgment {
34343435
@Override
34353436
public void acknowledge() {
34363437
Map<TopicPartition, List<Long>> offs = ListenerConsumer.this.offsetsInThisBatch;
3437-
Map<TopicPartition, List<ConsumerRecord<K, V>>> deferred = ListenerConsumer.this.deferredOffsets;
34383438
if (!this.acked) {
3439+
Map<TopicPartition, List<ConsumerRecord<K, V>>> deferred = ListenerConsumer.this.deferredOffsets;
34393440
for (ConsumerRecord<K, V> cRecord : getHighestOffsetRecords(this.records)) {
34403441
if (offs != null) {
34413442
offs.remove(new TopicPartition(cRecord.topic(), cRecord.partition()));
@@ -3529,11 +3530,15 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
35293530
ListenerConsumer.this.pausedForNack.removeAll(partitions);
35303531
partitions.forEach(ListenerConsumer.this.lastCommits::remove);
35313532
synchronized (ListenerConsumer.this) {
3532-
if (ListenerConsumer.this.offsetsInThisBatch != null) {
3533+
Map<TopicPartition, List<Long>> pendingOffsets = ListenerConsumer.this.offsetsInThisBatch;
3534+
if (pendingOffsets != null) {
35333535
partitions.forEach(tp -> {
3534-
ListenerConsumer.this.offsetsInThisBatch.remove(tp);
3536+
pendingOffsets.remove(tp);
35353537
ListenerConsumer.this.deferredOffsets.remove(tp);
35363538
});
3539+
if (pendingOffsets.isEmpty()) {
3540+
ListenerConsumer.this.consumerPaused = false;
3541+
}
35373542
}
35383543
}
35393544
}
@@ -3586,7 +3591,16 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
35863591
}
35873592

35883593
private void repauseIfNeeded(Collection<TopicPartition> partitions) {
3589-
if (isPaused() || ListenerConsumer.this.remainingRecords != null && !partitions.isEmpty()) {
3594+
boolean pending = false;
3595+
synchronized (ListenerConsumer.this) {
3596+
Map<TopicPartition, List<Long>> pendingOffsets = ListenerConsumer.this.offsetsInThisBatch;
3597+
if (!ObjectUtils.isEmpty(pendingOffsets)) {
3598+
pending = true;
3599+
}
3600+
}
3601+
if ((pending || isPaused() || ListenerConsumer.this.remainingRecords != null)
3602+
&& !partitions.isEmpty()) {
3603+
35903604
ListenerConsumer.this.consumer.pause(partitions);
35913605
ListenerConsumer.this.consumerPaused = true;
35923606
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 175 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@
6666
import org.springframework.kafka.event.ConsumerStartedEvent;
6767
import org.springframework.kafka.event.ConsumerStartingEvent;
6868
import org.springframework.kafka.event.ListenerContainerIdleEvent;
69+
import org.springframework.kafka.listener.ContainerProperties.AckMode;
6970
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
71+
import org.springframework.kafka.support.Acknowledgment;
7072
import org.springframework.kafka.test.utils.KafkaTestUtils;
7173
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
7274
import org.springframework.lang.Nullable;
@@ -904,7 +906,7 @@ void removeFromPartitionPauseRequestedWhenNotAssigned() throws InterruptedExcept
904906

905907
@SuppressWarnings({ "unchecked", "rawtypes" })
906908
@Test
907-
void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseLagacyAssignor() throws InterruptedException {
909+
void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseLegacyAssignor() throws InterruptedException {
908910
TopicPartition tp0 = new TopicPartition("foo", 0);
909911
TopicPartition tp1 = new TopicPartition("foo", 1);
910912
TopicPartition tp2 = new TopicPartition("foo", 2);
@@ -1116,6 +1118,178 @@ public boolean handleOne(Exception thrownException, ConsumerRecord<?, ?> record,
11161118
assertThat(recordsDelivered.get(2)).isEqualTo(record1);
11171119
}
11181120

1121+
@SuppressWarnings({ "unchecked", "rawtypes" })
1122+
@Test
1123+
void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws InterruptedException {
1124+
TopicPartition tp0 = new TopicPartition("foo", 0);
1125+
TopicPartition tp1 = new TopicPartition("foo", 1);
1126+
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1);
1127+
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new HashMap<>();
1128+
allRecordMap.put(tp0,
1129+
List.of(new ConsumerRecord("foo", 0, 0, null, "bar"), new ConsumerRecord("foo", 0, 1, null, "bar")));
1130+
allRecordMap.put(tp1,
1131+
List.of(new ConsumerRecord("foo", 1, 0, null, "bar"), new ConsumerRecord("foo", 1, 1, null, "bar")));
1132+
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
1133+
List<TopicPartition> afterRevokeAssignments = Arrays.asList(tp1);
1134+
AtomicInteger pollPhase = new AtomicInteger();
1135+
1136+
Consumer consumer = mock(Consumer.class);
1137+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
1138+
CountDownLatch subscribeLatch = new CountDownLatch(1);
1139+
willAnswer(invocation -> {
1140+
rebal.set(invocation.getArgument(1));
1141+
subscribeLatch.countDown();
1142+
return null;
1143+
}).given(consumer).subscribe(any(Collection.class), any());
1144+
CountDownLatch pauseLatch = new CountDownLatch(1);
1145+
AtomicBoolean paused = new AtomicBoolean();
1146+
willAnswer(inv -> {
1147+
paused.set(true);
1148+
pauseLatch.countDown();
1149+
return null;
1150+
}).given(consumer).pause(any());
1151+
ConsumerFactory cf = mock(ConsumerFactory.class);
1152+
given(cf.createConsumer(any(), any(), any(), any())).willReturn(consumer);
1153+
given(cf.getConfigurationProperties())
1154+
.willReturn(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
1155+
ContainerProperties containerProperties = new ContainerProperties("foo");
1156+
containerProperties.setGroupId("grp");
1157+
containerProperties.setAckMode(AckMode.MANUAL);
1158+
containerProperties.setMessageListener(ackOffset1());
1159+
containerProperties.setAsyncAcks(true);
1160+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(cf,
1161+
containerProperties);
1162+
CountDownLatch pollLatch = new CountDownLatch(2);
1163+
CountDownLatch rebalLatch = new CountDownLatch(1);
1164+
CountDownLatch continueLatch = new CountDownLatch(1);
1165+
willAnswer(inv -> {
1166+
Thread.sleep(50);
1167+
pollLatch.countDown();
1168+
switch (pollPhase.getAndIncrement()) {
1169+
case 0:
1170+
rebal.get().onPartitionsAssigned(allAssignments);
1171+
return allRecords;
1172+
case 1:
1173+
rebal.get().onPartitionsRevoked(allAssignments);
1174+
rebal.get().onPartitionsAssigned(afterRevokeAssignments);
1175+
rebalLatch.countDown();
1176+
continueLatch.await(10, TimeUnit.SECONDS);
1177+
default:
1178+
return ConsumerRecords.empty();
1179+
}
1180+
}).given(consumer).poll(any());
1181+
container.start();
1182+
assertThat(subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue();
1183+
KafkaMessageListenerContainer child = (KafkaMessageListenerContainer) KafkaTestUtils
1184+
.getPropertyValue(container, "containers", List.class).get(0);
1185+
assertThat(pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
1186+
assertThat(pauseLatch.await(10, TimeUnit.SECONDS)).isTrue();
1187+
assertThat(rebalLatch.await(10, TimeUnit.SECONDS)).isTrue();
1188+
Map offsets = KafkaTestUtils.getPropertyValue(child, "listenerConsumer.offsetsInThisBatch", Map.class);
1189+
assertThat(offsets).hasSize(0);
1190+
assertThat(KafkaTestUtils.getPropertyValue(child, "listenerConsumer.consumerPaused", Boolean.class)).isFalse();
1191+
continueLatch.countDown();
1192+
// no pause when re-assigned because all revoked
1193+
verify(consumer).pause(any());
1194+
verify(consumer, never()).resume(any());
1195+
container.stop();
1196+
}
1197+
1198+
@SuppressWarnings({ "unchecked", "rawtypes" })
1199+
@Test
1200+
void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws InterruptedException {
1201+
TopicPartition tp0 = new TopicPartition("foo", 0);
1202+
TopicPartition tp1 = new TopicPartition("foo", 1);
1203+
List<TopicPartition> allAssignments = Arrays.asList(tp0, tp1);
1204+
Map<TopicPartition, List<ConsumerRecord<String, String>>> allRecordMap = new HashMap<>();
1205+
allRecordMap.put(tp0,
1206+
List.of(new ConsumerRecord("foo", 0, 0, null, "bar"), new ConsumerRecord("foo", 0, 1, null, "bar")));
1207+
allRecordMap.put(tp1,
1208+
List.of(new ConsumerRecord("foo", 1, 0, null, "bar"), new ConsumerRecord("foo", 1, 1, null, "bar")));
1209+
ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap);
1210+
List<TopicPartition> afterRevokeAssignments = Arrays.asList(tp1);
1211+
AtomicInteger pollPhase = new AtomicInteger();
1212+
1213+
Consumer consumer = mock(Consumer.class);
1214+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
1215+
CountDownLatch subscribeLatch = new CountDownLatch(1);
1216+
willAnswer(invocation -> {
1217+
rebal.set(invocation.getArgument(1));
1218+
subscribeLatch.countDown();
1219+
return null;
1220+
}).given(consumer).subscribe(any(Collection.class), any());
1221+
CountDownLatch pauseLatch = new CountDownLatch(1);
1222+
AtomicBoolean paused = new AtomicBoolean();
1223+
willAnswer(inv -> {
1224+
paused.set(true);
1225+
pauseLatch.countDown();
1226+
return null;
1227+
}).given(consumer).pause(any());
1228+
ConsumerFactory cf = mock(ConsumerFactory.class);
1229+
given(cf.createConsumer(any(), any(), any(), any())).willReturn(consumer);
1230+
given(cf.getConfigurationProperties())
1231+
.willReturn(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"));
1232+
ContainerProperties containerProperties = new ContainerProperties("foo");
1233+
containerProperties.setGroupId("grp");
1234+
containerProperties.setAckMode(AckMode.MANUAL);
1235+
containerProperties.setMessageListener(ackOffset1());
1236+
containerProperties.setAsyncAcks(true);
1237+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(cf,
1238+
containerProperties);
1239+
CountDownLatch pollLatch = new CountDownLatch(2);
1240+
CountDownLatch rebalLatch = new CountDownLatch(1);
1241+
CountDownLatch continueLatch = new CountDownLatch(1);
1242+
willAnswer(inv -> {
1243+
Thread.sleep(50);
1244+
pollLatch.countDown();
1245+
switch (pollPhase.getAndIncrement()) {
1246+
case 0:
1247+
rebal.get().onPartitionsAssigned(allAssignments);
1248+
return allRecords;
1249+
case 1:
1250+
rebal.get().onPartitionsRevoked(List.of(tp0));
1251+
rebal.get().onPartitionsAssigned(List.of(new TopicPartition("foo", 2)));
1252+
rebalLatch.countDown();
1253+
continueLatch.await(10, TimeUnit.SECONDS);
1254+
default:
1255+
return ConsumerRecords.empty();
1256+
}
1257+
}).given(consumer).poll(any());
1258+
container.start();
1259+
assertThat(subscribeLatch.await(10, TimeUnit.SECONDS)).isTrue();
1260+
KafkaMessageListenerContainer child = (KafkaMessageListenerContainer) KafkaTestUtils
1261+
.getPropertyValue(container, "containers", List.class).get(0);
1262+
assertThat(pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
1263+
assertThat(pauseLatch.await(10, TimeUnit.SECONDS)).isTrue();
1264+
assertThat(rebalLatch.await(10, TimeUnit.SECONDS)).isTrue();
1265+
Map offsets = KafkaTestUtils.getPropertyValue(child, "listenerConsumer.offsetsInThisBatch", Map.class);
1266+
assertThat(offsets).hasSize(1);
1267+
assertThat(offsets.get(tp1)).isNotNull();
1268+
assertThat(KafkaTestUtils.getPropertyValue(child, "listenerConsumer.consumerPaused", Boolean.class)).isTrue();
1269+
continueLatch.countDown();
1270+
verify(consumer, times(2)).pause(any());
1271+
verify(consumer, never()).resume(any());
1272+
container.stop();
1273+
}
1274+
1275+
@SuppressWarnings("rawtypes")
1276+
private AcknowledgingMessageListener ackOffset1() {
1277+
return new AcknowledgingMessageListener() {
1278+
1279+
@Override
1280+
public void onMessage(ConsumerRecord rec, @Nullable Acknowledgment ack) {
1281+
if (rec.offset() == 1) {
1282+
ack.acknowledge();
1283+
}
1284+
}
1285+
1286+
@Override
1287+
public void onMessage(Object data) {
1288+
}
1289+
1290+
};
1291+
}
1292+
11191293
public static class TestMessageListener1 implements MessageListener<String, String>, ConsumerSeekAware {
11201294

11211295
private static ThreadLocal<ConsumerSeekCallback> callbacks = new ThreadLocal<>();

0 commit comments

Comments
 (0)