Skip to content

Commit d978246

Browse files
Tom van den Bergeartembilan
Tom van den Berge
authored andcommitted
GH-511: Committing offset immediately on error
Fixes: #511 **Cherry-pick to 2.0.x & master**
1 parent f41b348 commit d978246

File tree

2 files changed

+118
-3
lines changed

2 files changed

+118
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
* @author Loic Talhouarne
8989
* @author Vladimir Tsanev
9090
* @author Yang Qiju
91+
* @author Tom van den Berge
9192
*/
9293
public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
9394

@@ -955,7 +956,20 @@ else if (!this.isAnyManualAck && !this.autoCommit) {
955956
}
956957
catch (RuntimeException e) {
957958
if (this.containerProperties.isAckOnError() && !this.autoCommit && producer == null) {
958-
this.acks.add(record);
959+
if (this.isRecordAck) {
960+
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
961+
Collections.singletonMap(new TopicPartition(record.topic(), record.partition()),
962+
new OffsetAndMetadata(record.offset() + 1));
963+
if (this.containerProperties.isSyncCommits()) {
964+
this.consumer.commitSync(offsetsToCommit);
965+
}
966+
else {
967+
this.consumer.commitAsync(offsetsToCommit, this.commitCallback);
968+
}
969+
}
970+
else if (!this.isAnyManualAck) {
971+
this.acks.add(record);
972+
}
959973
}
960974
if (this.errorHandler == null) {
961975
throw e;

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 103 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
* @author Martin Dam
8585
* @author Artem Bilan
8686
* @author Loic Talhouarne
87+
* @author Tom van den Berge
8788
*/
8889
public class KafkaMessageListenerContainerTests {
8990

@@ -115,9 +116,11 @@ public class KafkaMessageListenerContainerTests {
115116

116117
private static String topic17 = "testTopic17";
117118

119+
private static String topic18 = "testTopic18";
120+
118121
@ClassRule
119122
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, topic5,
120-
topic6, topic7, topic8, topic9, topic10, topic11, topic12, topic13, topic14, topic15, topic16, topic17);
123+
topic6, topic7, topic8, topic9, topic10, topic11, topic12, topic13, topic14, topic15, topic16, topic17, topic18);
121124

122125
@Rule
123126
public TestName testName = new TestName();
@@ -1225,6 +1228,102 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
12251228
logger.info("Stop manual ack rebalance");
12261229
}
12271230

1231+
@Test
1232+
public void testRebalanceAfterFailedRecord() throws Exception {
1233+
logger.info("Start rebalance after failed record");
1234+
Map<String, Object> props = KafkaTestUtils.consumerProps("test18", "false", embeddedKafka);
1235+
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
1236+
ContainerProperties containerProps = new ContainerProperties(topic18);
1237+
final List<AtomicInteger> counts = new ArrayList<>();
1238+
counts.add(new AtomicInteger());
1239+
counts.add(new AtomicInteger());
1240+
containerProps.setMessageListener(new MessageListener<Integer, String>() {
1241+
1242+
@Override
1243+
public void onMessage(ConsumerRecord<Integer, String> message) {
1244+
// The 1st message per partition fails
1245+
if (counts.get(message.partition()).incrementAndGet() < 2) {
1246+
throw new RuntimeException("Failure wile processing message");
1247+
}
1248+
}
1249+
});
1250+
containerProps.setSyncCommits(true);
1251+
containerProps.setAckMode(AckMode.RECORD);
1252+
final CountDownLatch rebalanceLatch = new CountDownLatch(2);
1253+
containerProps.setConsumerRebalanceListener(new ConsumerRebalanceListener() {
1254+
1255+
@Override
1256+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
1257+
}
1258+
1259+
@Override
1260+
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
1261+
logger.info("manual ack: assigned " + partitions);
1262+
rebalanceLatch.countDown();
1263+
}
1264+
});
1265+
1266+
CountDownLatch stubbingComplete1 = new CountDownLatch(1);
1267+
KafkaMessageListenerContainer<Integer, String> container1 =
1268+
spyOnContainer(new KafkaMessageListenerContainer<>(cf, containerProps), stubbingComplete1);
1269+
container1.setBeanName("testRebalanceAfterFailedRecord");
1270+
container1.start();
1271+
Consumer<?, ?> containerConsumer = spyOnConsumer(container1);
1272+
final CountDownLatch commitLatch = new CountDownLatch(2);
1273+
willAnswer(invocation -> {
1274+
1275+
@SuppressWarnings({ "unchecked" })
1276+
Map<TopicPartition, OffsetAndMetadata> map = invocation.getArgumentAt(0, Map.class);
1277+
try {
1278+
return invocation.callRealMethod();
1279+
}
1280+
finally {
1281+
for (Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
1282+
// Decrement when the last (successful) has been committed
1283+
if (entry.getValue().offset() == 2) {
1284+
commitLatch.countDown();
1285+
}
1286+
}
1287+
}
1288+
1289+
}).given(containerConsumer).commitSync(any());
1290+
stubbingComplete1.countDown();
1291+
ContainerTestUtils.waitForAssignment(container1, embeddedKafka.getPartitionsPerTopic());
1292+
1293+
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
1294+
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
1295+
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
1296+
template.setDefaultTopic(topic18);
1297+
template.sendDefault(0, 0, "foo");
1298+
template.sendDefault(1, 0, "baz");
1299+
template.sendDefault(0, 0, "bar");
1300+
template.sendDefault(1, 0, "qux");
1301+
template.flush();
1302+
1303+
// Wait until both partitions have committed offset 2 (i.e. the last message)
1304+
assertThat(commitLatch.await(30, TimeUnit.SECONDS)).isTrue();
1305+
1306+
// Start a 2nd consumer, triggering a rebalance
1307+
KafkaMessageListenerContainer<Integer, String> container2 =
1308+
new KafkaMessageListenerContainer<>(cf, containerProps);
1309+
container2.setBeanName("testRebalanceAfterFailedRecord2");
1310+
container2.start();
1311+
// Wait until both consumers have finished rebalancing
1312+
assertThat(rebalanceLatch.await(60, TimeUnit.SECONDS)).isTrue();
1313+
1314+
// Stop both consumers
1315+
container1.stop();
1316+
container2.stop();
1317+
Consumer<Integer, String> consumer = cf.createConsumer();
1318+
consumer.assign(Arrays.asList(new TopicPartition(topic18, 0), new TopicPartition(topic18, 1)));
1319+
1320+
// Verify that offset of both partitions is the highest committed offset
1321+
assertThat(consumer.position(new TopicPartition(topic18, 0))).isEqualTo(2);
1322+
assertThat(consumer.position(new TopicPartition(topic18, 1))).isEqualTo(2);
1323+
consumer.close();
1324+
logger.info("Stop rebalance after failed record");
1325+
}
1326+
12281327
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
12291328
Consumer<?, ?> consumer = spy(
12301329
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class));
@@ -1233,8 +1332,10 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
12331332
return consumer;
12341333
}
12351334

1236-
private KafkaMessageListenerContainer<Integer, String> spyOnContainer(KafkaMessageListenerContainer<Integer, String> container,
1335+
private KafkaMessageListenerContainer<Integer, String> spyOnContainer(
1336+
KafkaMessageListenerContainer<Integer, String> container,
12371337
final CountDownLatch stubbingComplete) {
1338+
12381339
KafkaMessageListenerContainer<Integer, String> spy = spy(container);
12391340
willAnswer(i -> {
12401341
if (stubbingComplete.getCount() > 0 && Thread.currentThread().getName().endsWith("-C-1")) {

0 commit comments

Comments
 (0)