Skip to content

Commit

Permalink
SIKGH-198: Add pause/resume to Listener Containers
Browse files Browse the repository at this point in the history
  • Loading branch information
garyrussell authored and artembilan committed Feb 21, 2018
1 parent 6a41127 commit 1540092
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public enum AckMode {

private volatile boolean running = false;

private volatile boolean paused;

protected AbstractMessageListenerContainer(ContainerProperties containerProperties) {
Assert.notNull(containerProperties, "'containerProperties' cannot be null");

Expand Down Expand Up @@ -187,6 +189,10 @@ public boolean isRunning() {
return this.running;
}

protected boolean isPaused() {
return this.paused;
}

public void setPhase(int phase) {
this.phase = phase;
}
Expand Down Expand Up @@ -241,6 +247,16 @@ public void run() {
}
}

@Override
public void pause() {
this.paused = true;
}

@Override
public void resume() {
this.paused = false;
}

@Override
public void stop(Runnable callback) {
synchronized (this.lifecycleMonitor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,18 @@ public void run() {
}
}

@Override
public void pause() {
super.pause();
this.containers.forEach(c -> c.pause());
}

@Override
public void resume() {
super.resume();
this.containers.forEach(c -> c.resume());
}

@Override
public String toString() {
return "ConcurrentMessageListenerContainer [concurrency=" + this.concurrency + ", beanName="
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private boolean taskSchedulerExplicitlySet;

private boolean consumerPaused;

@SuppressWarnings("unchecked")
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
Assert.state(!this.isAnyManualAck || !this.autoCommit,
Expand Down Expand Up @@ -655,7 +657,21 @@ public void run() {
processCommits();
}
processSeeks();
if (!this.consumerPaused && isPaused()) {
this.consumer.pause(this.consumer.assignment());
this.consumerPaused = true;
if (this.logger.isDebugEnabled()) {
this.logger.debug("Paused consumption from: " + this.consumer.paused());
}
}
ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
if (this.consumerPaused && !isPaused()) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Resuming consumption from: " + this.consumer.paused());
}
this.consumer.resume(this.consumer.paused());
this.consumerPaused = false;
}
if (records != null && this.logger.isDebugEnabled()) {
this.logger.debug("Received: " + records.count() + " records");
if (records.count() > 0 && this.logger.isTraceEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,20 @@ default Collection<TopicPartition> getAssignedPartitions() {
throw new UnsupportedOperationException("This container doesn't support retrieving its assigned partitions");
}

/**
* Pause this container before the next poll().
* @since 2.1.3
*/
default void pause() {
throw new UnsupportedOperationException("This container doesn't support pause");
}

/**
* Resume this container, if paused, after the next poll().
* @since 2.1.3
*/
default void resume() {
throw new UnsupportedOperationException("This container doesn't support resume");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1621,6 +1622,59 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.info("Stop rebalance after failed record");
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testPauseResume() throws Exception {
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
Consumer<Integer, String> consumer = mock(Consumer.class);
given(cf.createConsumer(isNull(), eq("clientId"), isNull())).willReturn(consumer);
final Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records = new HashMap<>();
records.put(new TopicPartition("foo", 0), Arrays.asList(
new ConsumerRecord<>("foo", 0, 0L, 1, "foo"),
new ConsumerRecord<>("foo", 0, 1L, 1, "bar")));
ConsumerRecords<Integer, String> consumerRecords = new ConsumerRecords<>(records);
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
AtomicBoolean first = new AtomicBoolean(true);
given(consumer.poll(anyLong())).willAnswer(i -> {
Thread.sleep(50);
return first.getAndSet(false) ? consumerRecords : emptyRecords;
});
final CountDownLatch commitLatch = new CountDownLatch(2);
willAnswer(i -> {
commitLatch.countDown();
return null;
}).given(consumer).commitSync(any(Map.class));
given(consumer.assignment()).willReturn(records.keySet());
final CountDownLatch pauseLatch = new CountDownLatch(1);
willAnswer(i -> {
pauseLatch.countDown();
return null;
}).given(consumer).pause(records.keySet());
given(consumer.paused()).willReturn(records.keySet());
final CountDownLatch resumeLatch = new CountDownLatch(1);
willAnswer(i -> {
resumeLatch.countDown();
return null;
}).given(consumer).resume(records.keySet());
TopicPartitionInitialOffset[] topicPartition = new TopicPartitionInitialOffset[] {
new TopicPartitionInitialOffset("foo", 0) };
ContainerProperties containerProps = new ContainerProperties(topicPartition);
containerProps.setAckMode(AckMode.RECORD);
containerProps.setClientId("clientId");
containerProps.setIdleEventInterval(100L);
containerProps.setMessageListener((MessageListener) r -> { });
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
container.start();
assertThat(commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
verify(consumer, times(2)).commitSync(any(Map.class));
container.pause();
assertThat(pauseLatch.await(10, TimeUnit.SECONDS)).isTrue();
container.resume();
assertThat(resumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
container.stop();
}

private Consumer<?, ?> spyOnConsumer(KafkaMessageListenerContainer<Integer, String> container) {
Consumer<?, ?> consumer = spy(
KafkaTestUtils.getPropertyValue(container, "listenerConsumer.consumer", Consumer.class));
Expand Down
10 changes: 10 additions & 0 deletions src/reference/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,16 @@ You can also perform seek operations from `onIdleContainer()` when an idle conta

To arbitrarily seek at runtime, use the callback reference from the `registerSeekCallback` for the appropriate thread.

[[pause-resume]]
==== Pausing/Resuming Listener Containers

_Version 2.1.3_ added `pause()` and `resume()` methods to listener containers.
Previously, you could pause a consumer within a `ConsumerAwareMessageListener` and resume it by listening for `ListenerContainerIdleEvent` s, which provide access to the `Consumer` object.
While you could pause a consumer in an idle container via an event listener, in some cases this was not thread-safe since there is no guarantee that the event listener is invoked on the consumer thread.
To safely pause/resume consumers, you should use the methods on the listener containers.
`pause()` takes effect just before the next `poll()`; `resume` takes effect, just after the current `poll()` returns.
When a container is paused, it continues to `poll()` the consumer, avoiding a rebalance if group management is being used, but will not retrieve any records; refer to the Kafka documentation for more information.

[[serdes]]
==== Serialization/Deserialization and Message Conversion

Expand Down
5 changes: 5 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ See <<serdes>> for more information.
Container Error handlers are now provided for both record and batch listeners that treat any exceptions thrown by the listener as fatal; they stop the container.
See <<annotation-error-handling>> for more information.

==== Pausing/Resuming Containers

The listener containers now have `pause()` and `resume()` methods (since _version 2.1.3_).
See <<pause-resume>> for more information.

==== Stateful Retry

Starting with _version 2.1.3_, stateful retry can be configured; see <<stateful-retry>> for more information.
Expand Down

0 comments on commit 1540092

Please sign in to comment.