File tree Expand file tree Collapse file tree 1 file changed +3
-0
lines changed
spring-kafka/src/test/java/org/springframework/kafka/listener Expand file tree Collapse file tree 1 file changed +3
-0
lines changed Original file line number Diff line number Diff line change @@ -2772,12 +2772,14 @@ public void rePausePartitionAfterRebalance() throws Exception {
2772
2772
return null ;
2773
2773
}).given (consumer ).pause (any ());
2774
2774
given (consumer .paused ()).willReturn (pausedParts );
2775
+ CountDownLatch firstPoll = new CountDownLatch (1 );
2775
2776
given (consumer .poll (any (Duration .class ))).willAnswer (i -> {
2776
2777
if (paused .get ()) {
2777
2778
pauseLatch1 .countDown ();
2778
2779
// hold up the consumer thread while we revoke/assign partitions on the test thread
2779
2780
suspendConsumerThread .await (10 , TimeUnit .SECONDS );
2780
2781
}
2782
+ firstPoll .countDown ();
2781
2783
Thread .sleep (50 );
2782
2784
return ConsumerRecords .empty ();
2783
2785
});
@@ -2801,6 +2803,7 @@ public void rePausePartitionAfterRebalance() throws Exception {
2801
2803
new KafkaMessageListenerContainer <>(cf , containerProps );
2802
2804
container .start ();
2803
2805
InOrder inOrder = inOrder (consumer );
2806
+ assertThat (firstPoll .await (10 , TimeUnit .SECONDS )).isNotNull ();
2804
2807
container .pausePartition (tp0 );
2805
2808
container .pausePartition (tp1 );
2806
2809
assertThat (pauseLatch1 .await (10 , TimeUnit .SECONDS )).isTrue ();
You can’t perform that action at this time.
0 commit comments