From d408c27efa0f226bb72c5a5ae543175a5cfc587c Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Tue, 3 Apr 2018 10:48:05 -0400 Subject: [PATCH] GH-637: Fix spurious nonresponsive consumer events Fixes https://github.com/spring-projects/spring-kafka/issues/637 Wrong timestamp used for event publication causing invalid events. --- .../KafkaMessageListenerContainer.java | 5 ++- .../KafkaMessageListenerContainerTests.java | 34 +++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 1db61d83d4..a18051cb10 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -415,6 +415,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private boolean consumerPaused; + private volatile long lastPoll = System.currentTimeMillis(); + @SuppressWarnings("unchecked") ListenerConsumer(GenericMessageListener listener, ListenerType listenerType) { Assert.state(!this.isAnyManualAck || !this.autoCommit, @@ -502,7 +504,7 @@ else if (listener instanceof MessageListener) { } protected void checkConsumer() { - long timeSinceLastPoll = System.currentTimeMillis() - last; + long timeSinceLastPoll = System.currentTimeMillis() - this.lastPoll; if (((float) timeSinceLastPoll) / (float) this.containerProperties.getPollTimeout() > this.containerProperties.getNoPollThreshold()) { publishNonResponsiveConsumerEvent(timeSinceLastPoll, this.consumer); @@ -695,6 +697,7 @@ public void run() { publishConsumerPausedEvent(this.consumer.assignment()); } ConsumerRecords records = this.consumer.poll(this.containerProperties.getPollTimeout()); + this.lastPoll = System.currentTimeMillis(); if (this.consumerPaused && !isPaused()) { if (this.logger.isDebugEnabled()) { this.logger.debug("Resuming consumption from: " + this.consumer.paused()); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 7576a4ee5b..beb33be290 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -664,6 +664,40 @@ public void testNonResponsiveConsumerEvent() throws Exception { container.stop(); } + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testNonResponsiveConsumerEventNotIssuedWithActiveConsumer() throws Exception { + ConsumerFactory cf = mock(ConsumerFactory.class); + Consumer consumer = mock(Consumer.class); + given(cf.createConsumer(isNull(), eq(""), isNull())).willReturn(consumer); + ConsumerRecords records = new ConsumerRecords(Collections.emptyMap()); + CountDownLatch latch = new CountDownLatch(20); + given(consumer.poll(anyLong())).willAnswer(i -> { + Thread.sleep(100); + latch.countDown(); + return records; + }); + TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] { + new TopicPartitionInitialOffset("foo", 0) }; + ContainerProperties containerProps = new ContainerProperties(topicPartition); + containerProps.setNoPollThreshold(2.0f); + containerProps.setPollTimeout(100); + containerProps.setMonitorInterval(1); + containerProps.setMessageListener(mock(MessageListener.class)); + KafkaMessageListenerContainer container = + new KafkaMessageListenerContainer<>(cf, containerProps); + final AtomicInteger eventCounter = new AtomicInteger(); + container.setApplicationEventPublisher(e -> { + if (e instanceof NonResponsiveConsumerEvent) { + eventCounter.incrementAndGet(); + } + }); + container.start(); + assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + container.stop(); + assertThat(eventCounter.get()).isEqualTo(0); + } + @Test public void testBatchAck() throws Exception { logger.info("Start batch ack");