Skip to content

Commit

Permalink
GH-637: Fix spurious nonresponsive consumer events
Browse files Browse the repository at this point in the history
Fixes #637

Wrong timestamp used for event publication causing invalid events.
  • Loading branch information
garyrussell authored and artembilan committed Apr 3, 2018
1 parent 3706405 commit d408c27
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private boolean consumerPaused;

private volatile long lastPoll = System.currentTimeMillis();

@SuppressWarnings("unchecked")
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
Assert.state(!this.isAnyManualAck || !this.autoCommit,
Expand Down Expand Up @@ -502,7 +504,7 @@ else if (listener instanceof MessageListener) {
}

protected void checkConsumer() {
long timeSinceLastPoll = System.currentTimeMillis() - last;
long timeSinceLastPoll = System.currentTimeMillis() - this.lastPoll;
if (((float) timeSinceLastPoll) / (float) this.containerProperties.getPollTimeout()
> this.containerProperties.getNoPollThreshold()) {
publishNonResponsiveConsumerEvent(timeSinceLastPoll, this.consumer);
Expand Down Expand Up @@ -695,6 +697,7 @@ public void run() {
publishConsumerPausedEvent(this.consumer.assignment());
}
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
this.lastPoll = System.currentTimeMillis();
if (this.consumerPaused && !isPaused()) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Resuming consumption from: " + this.consumer.paused());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,40 @@ public void testNonResponsiveConsumerEvent() throws Exception {
container.stop();
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testNonResponsiveConsumerEventNotIssuedWithActiveConsumer() throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Consumer<Integer, String> consumer = mock(Consumer.class);
given(cf.createConsumer(isNull(), eq(""), isNull())).willReturn(consumer);
ConsumerRecords records = new ConsumerRecords(Collections.emptyMap());
CountDownLatch latch = new CountDownLatch(20);
given(consumer.poll(anyLong())).willAnswer(i -> {
Thread.sleep(100);
latch.countDown();
return records;
});
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
new TopicPartitionInitialOffset("foo", 0) };
ContainerProperties containerProps = new ContainerProperties(topicPartition);
containerProps.setNoPollThreshold(2.0f);
containerProps.setPollTimeout(100);
containerProps.setMonitorInterval(1);
containerProps.setMessageListener(mock(MessageListener.class));
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
final AtomicInteger eventCounter = new AtomicInteger();
container.setApplicationEventPublisher(e -> {
if (e instanceof NonResponsiveConsumerEvent) {
eventCounter.incrementAndGet();
}
});
container.start();
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
container.stop();
assertThat(eventCounter.get()).isEqualTo(0);
}

@Test
public void testBatchAck() throws Exception {
logger.info("Start batch ack");
Expand Down

0 comments on commit d408c27

Please sign in to comment.