Skip to content

Commit e7e6c9d

Browse files
garyrussellartembilan
authored andcommitted
GH-623: Fix AckMode.COUNT
Fixes #623 Grab ack count before moving acks to offsets. # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java
1 parent 5e877a8 commit e7e6c9d

File tree

3 files changed

+71
-1
lines changed

3 files changed

+71
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1027,8 +1027,8 @@ private void sendOffsetsToTransaction(Producer producer) {
10271027
}
10281028

10291029
private void processCommits() {
1030-
handleAcks();
10311030
this.count += this.acks.size();
1031+
handleAcks();
10321032
long now;
10331033
AckMode ackMode = this.containerProperties.getAckMode();
10341034
if (!this.isManualImmediateAck) {

spring-kafka/src/main/java/org/springframework/kafka/support/SendResult.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,9 @@ public RecordMetadata getRecordMetadata() {
4747
return this.recordMetadata;
4848
}
4949

50+
@Override
51+
public String toString() {
52+
return "SendResult [producerRecord=" + this.producerRecord + ", recordMetadata=" + this.recordMetadata + "]";
53+
}
54+
5055
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1396,6 +1396,71 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
13961396
logger.info("Stop rebalance after failed record");
13971397
}
13981398

1399+
@SuppressWarnings({ "unchecked", "rawtypes" })
1400+
@Test
1401+
public void testAckModeCount() throws Exception {
1402+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
1403+
Consumer<Integer, String> consumer = mock(Consumer.class);
1404+
given(cf.createConsumer(isNull(), eq("clientId"), isNull())).willReturn(consumer);
1405+
TopicPartition topicPartition = new TopicPartition("foo", 0);
1406+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records1 = new HashMap<>();
1407+
records1.put(topicPartition, Arrays.asList(
1408+
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
1409+
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
1410+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records2 = new HashMap<>();
1411+
records2.put(topicPartition, Arrays.asList(
1412+
new ConsumerRecord<>("foo", 0, 2L, 1, "baz"),
1413+
new ConsumerRecord<>("foo", 0, 3L, 1, "qux"))); // commit (4 >= 3)
1414+
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records3 = new HashMap<>();
1415+
records3.put(topicPartition, Arrays.asList(
1416+
new ConsumerRecord<>("foo", 0, 4L, 1, "fiz"),
1417+
new ConsumerRecord<>("foo", 0, 5L, 1, "buz"),
1418+
new ConsumerRecord<>("foo", 0, 6L, 1, "bif"))); // commit (3 >= 3)
1419+
ConsumerRecords<Integer, String> consumerRecords1 = new ConsumerRecords<>(records1);
1420+
ConsumerRecords<Integer, String> consumerRecords2 = new ConsumerRecords<>(records2);
1421+
ConsumerRecords<Integer, String> consumerRecords3 = new ConsumerRecords<>(records3);
1422+
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
1423+
AtomicInteger which = new AtomicInteger();
1424+
given(consumer.poll(anyLong())).willAnswer(i -> {
1425+
Thread.sleep(50);
1426+
int recordsToUse = which.incrementAndGet();
1427+
switch (recordsToUse) {
1428+
case 1:
1429+
return consumerRecords1;
1430+
case 2:
1431+
return consumerRecords2;
1432+
case 3:
1433+
return consumerRecords3;
1434+
default:
1435+
return emptyRecords;
1436+
}
1437+
});
1438+
final CountDownLatch commitLatch = new CountDownLatch(2);
1439+
willAnswer(i -> {
1440+
commitLatch.countDown();
1441+
return null;
1442+
}).given(consumer).commitSync(any(Map.class));
1443+
given(consumer.assignment()).willReturn(records1.keySet());
1444+
TopicPartitionInitialOffset[] topicPartitionOffset = new TopicPartitionInitialOffset[] {
1445+
new TopicPartitionInitialOffset("foo", 0) };
1446+
ContainerProperties containerProps = new ContainerProperties(topicPartitionOffset);
1447+
containerProps.setAckMode(AckMode.COUNT);
1448+
containerProps.setAckCount(3);
1449+
containerProps.setClientId("clientId");
1450+
AtomicInteger recordCount = new AtomicInteger();
1451+
containerProps.setMessageListener((MessageListener) r -> {
1452+
recordCount.incrementAndGet();
1453+
});
1454+
KafkaMessageListenerContainer<Integer, String> container =
1455+
new KafkaMessageListenerContainer<>(cf, containerProps);
1456+
container.start();
1457+
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
1458+
assertThat(recordCount.get()).isEqualTo(7);
1459+
verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(4L)));
1460+
verify(consumer).commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(7L)));
1461+
container.stop();
1462+
}
1463+
13991464
private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
14001465
Consumer<?, ?> consumer = spy(
14011466
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class));

0 commit comments

Comments
 (0)