Skip to content

Commit 2ef00e5

Browse files
authored
GH-2565 Fix Resume Partition When Unassigned
Resolves #2565 Previously, when a partition was resumed while not currently assigned, it would be paused indefinitely when reassigned. This was caused by the `ConcurrentMessageListenerContainer` only resuming partitions in child containers if that partition is currently assigned to the container, leaving the pause request in place. Use the `isPartitionPauseRequested()` method on the child container instead when deciding whether to resume the partition. **cherry-pick to 2.9.x**
1 parent f1f41e9 commit 2ef00e5

File tree

2 files changed

+58
-3
lines changed

2 files changed

+58
-3
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2022 the original author or authors.
2+
* Copyright 2015-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -374,7 +374,7 @@ public void resumePartition(TopicPartition topicPartition) {
374374
synchronized (this.lifecycleMonitor) {
375375
this.containers
376376
.stream()
377-
.filter(container -> containsPartition(topicPartition, container))
377+
.filter(container -> container.isPartitionPauseRequested(topicPartition))
378378
.forEach(container -> container.resumePartition(topicPartition));
379379
}
380380
}

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2022 the original author or authors.
2+
* Copyright 2019-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
2424
import static org.mockito.BDDMockito.willAnswer;
2525
import static org.mockito.Mockito.mock;
2626
import static org.mockito.Mockito.never;
27+
import static org.mockito.Mockito.times;
2728
import static org.mockito.Mockito.verify;
2829

2930
import java.time.Duration;
@@ -844,6 +845,60 @@ private void testInitialCommitIBasedOnCommitted(boolean committed) throws Interr
844845
}
845846
}
846847

848+
@SuppressWarnings({ "unchecked", "rawtypes" })
849+
@Test
850+
void removeFromPartitionPauseRequestedWhenNotAssigned() throws InterruptedException {
851+
Consumer consumer = mock(Consumer.class);
852+
CountDownLatch pollLatch = new CountDownLatch(1);
853+
willAnswer(inv -> {
854+
pollLatch.countDown();
855+
Thread.sleep(50);
856+
return ConsumerRecords.empty();
857+
}).given(consumer).poll(any());
858+
CountDownLatch pauseLatch = new CountDownLatch(1);
859+
willAnswer(inv -> {
860+
pauseLatch.countDown();
861+
return null;
862+
}).given(consumer).pause(any());
863+
TopicPartition tp0 = new TopicPartition("foo", 0);
864+
List<TopicPartition> assignments = Arrays.asList(tp0);
865+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
866+
willAnswer(invocation -> {
867+
rebal.set(invocation.getArgument(1));
868+
rebal.get().onPartitionsAssigned(assignments);
869+
return null;
870+
}).given(consumer).subscribe(any(Collection.class), any());
871+
ConsumerFactory cf = mock(ConsumerFactory.class);
872+
given(cf.createConsumer(any(), any(), any(), any())).willReturn(consumer);
873+
given(cf.getConfigurationProperties())
874+
.willReturn(Collections.singletonMap(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"));
875+
ContainerProperties containerProperties = new ContainerProperties("foo");
876+
containerProperties.setGroupId("grp");
877+
containerProperties.setMessageListener((MessageListener) rec -> { });
878+
containerProperties.setMissingTopicsFatal(false);
879+
containerProperties.setAssignmentCommitOption(AssignmentCommitOption.LATEST_ONLY);
880+
ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer(cf,
881+
containerProperties);
882+
container.start();
883+
assertThat(pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
884+
container.pausePartition(tp0);
885+
KafkaMessageListenerContainer child = (KafkaMessageListenerContainer) KafkaTestUtils
886+
.getPropertyValue(container, "containers", List.class).get(0);
887+
assertThat(child.isPartitionPauseRequested(tp0)).isTrue();
888+
assertThat(pauseLatch.await(10, TimeUnit.SECONDS)).isTrue();
889+
rebal.get().onPartitionsRevoked(assignments);
890+
assertThat(child.isPartitionPauseRequested(tp0)).isTrue();
891+
// immediate pause when re-assigned
892+
rebal.get().onPartitionsAssigned(assignments);
893+
verify(consumer, times(2)).pause(any());
894+
rebal.get().onPartitionsRevoked(assignments);
895+
// resume partition while unassigned
896+
container.resumePartition(tp0);
897+
assertThat(child.isPartitionPauseRequested(tp0)).isFalse();
898+
rebal.get().onPartitionsAssigned(assignments);
899+
verify(consumer, times(2)).pause(any()); // no immediate pause this time
900+
}
901+
847902
public static class TestMessageListener1 implements MessageListener<String, String>, ConsumerSeekAware {
848903

849904
private static ThreadLocal<ConsumerSeekCallback> callbacks = new ThreadLocal<>();

0 commit comments

Comments
 (0)