Skip to content

Commit fd339c5

Browse files
antonio-tomacgaryrussell
authored andcommitted
GH-2738: Handle WakeupException in FallbackBatchEH
Resolves #2738 GH-2738: Pausing container results in WakeupException without retrying failed listener invocation GH-2738: fix code style, use BDDMockito instead of Mockito GH-2738: fix code style, alphabetical import order Increase awaitility timeouts.
1 parent 7fba1a3 commit fd339c5

File tree

2 files changed

+245
-2
lines changed

2 files changed

+245
-2
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.kafka.clients.consumer.ConsumerRecord;
2626
import org.apache.kafka.clients.consumer.ConsumerRecords;
2727
import org.apache.kafka.common.TopicPartition;
28+
import org.apache.kafka.common.errors.WakeupException;
2829

2930
import org.springframework.classify.BinaryExceptionClassifier;
3031
import org.springframework.core.log.LogAccessor;
@@ -38,6 +39,7 @@
3839
*
3940
* @author Gary Russell
4041
* @author Andrii Pelesh
42+
* @author Antonio Tomac
4143
* @since 2.8
4244
*
4345
*/
@@ -168,7 +170,13 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
168170
Exception lastException = unwrapIfNeeded(thrownException);
169171
Boolean retryable = classifier.classify(lastException);
170172
while (Boolean.TRUE.equals(retryable) && nextBackOff != BackOffExecution.STOP) {
171-
consumer.poll(Duration.ZERO);
173+
try {
174+
consumer.poll(Duration.ZERO);
175+
}
176+
catch (WakeupException we) {
177+
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
178+
throw new KafkaException("Woken up during retry", logLevel, we);
179+
}
172180
try {
173181
ListenerUtils.stoppableSleep(container, nextBackOff);
174182
}
@@ -180,7 +188,13 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
180188
if (!container.isRunning()) {
181189
throw new KafkaException("Container stopped during retries");
182190
}
183-
consumer.poll(Duration.ZERO);
191+
try {
192+
consumer.poll(Duration.ZERO);
193+
}
194+
catch (WakeupException we) {
195+
seeker.handleBatch(thrownException, records, consumer, container, () -> { });
196+
throw new KafkaException("Woken up during retry", logLevel, we);
197+
}
184198
try {
185199
invokeListener.run();
186200
return;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Copyright 2023-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.awaitility.Awaitility.await;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.BDDMockito.willAnswer;
23+
import static org.mockito.Mockito.spy;
24+
25+
import java.time.Duration;
26+
import java.util.LinkedHashSet;
27+
import java.util.List;
28+
import java.util.Properties;
29+
import java.util.Set;
30+
import java.util.concurrent.atomic.AtomicBoolean;
31+
32+
import org.apache.commons.logging.LogFactory;
33+
import org.apache.kafka.clients.consumer.Consumer;
34+
import org.apache.kafka.clients.consumer.ConsumerRecords;
35+
import org.apache.kafka.clients.producer.Producer;
36+
import org.apache.kafka.clients.producer.ProducerRecord;
37+
import org.junit.jupiter.api.Test;
38+
39+
import org.springframework.beans.factory.annotation.Autowired;
40+
import org.springframework.context.annotation.Bean;
41+
import org.springframework.context.annotation.Configuration;
42+
import org.springframework.core.log.LogAccessor;
43+
import org.springframework.kafka.annotation.EnableKafka;
44+
import org.springframework.kafka.annotation.KafkaListener;
45+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
46+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
47+
import org.springframework.kafka.core.ConsumerFactory;
48+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
49+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
50+
import org.springframework.kafka.core.ProducerFactory;
51+
import org.springframework.kafka.support.Acknowledgment;
52+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
53+
import org.springframework.kafka.test.context.EmbeddedKafka;
54+
import org.springframework.kafka.test.utils.KafkaTestUtils;
55+
import org.springframework.test.annotation.DirtiesContext;
56+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
57+
import org.springframework.util.backoff.FixedBackOff;
58+
59+
/**
60+
* @author Antonio Tomac
61+
* @since 2.9
62+
*/
63+
@SpringJUnitConfig
64+
@DirtiesContext
65+
@EmbeddedKafka(topics = "foo", partitions = 1)
66+
public class PauseContainerWhileErrorHandlerIsRetryingTests {
67+
68+
private static final LogAccessor log = new LogAccessor(LogFactory.getLog(PauseContainerWhileErrorHandlerIsRetryingTests.class));
69+
70+
private static void log(String message) {
71+
log.error(message);
72+
}
73+
74+
@Autowired
75+
private Config setup;
76+
77+
@Test
78+
public void provokeRetriesTriggerPauseThenResume() throws InterruptedException {
79+
setup.produce(1, 2); //normally processed
80+
await("for first 2 records")
81+
.atMost(Duration.ofSeconds(10))
82+
.untilAsserted(() -> assertThat(setup.received).as("received").contains("1", "2"));
83+
assertThat(setup.processed).as("processed").contains("1", "2");
84+
85+
setup.triggerPause.set(true);
86+
log("enable listener throwing");
87+
setup.failing.set(true);
88+
setup.produce(3, 4, 5); //could loose those
89+
90+
await("for next 3 records")
91+
.atMost(Duration.ofSeconds(10))
92+
.untilAsserted(() -> assertThat(setup.received)
93+
.as("received")
94+
.hasSizeGreaterThan(2));
95+
assertThat(setup.processed).as("processed").hasSize(2);
96+
97+
setup.triggerPause.set(false);
98+
setup.resumeContainer();
99+
100+
log("disable listener throwing");
101+
setup.failing.set(false);
102+
setup.produce(6, 7, 8, 9);
103+
104+
await("for last 4 records")
105+
.atMost(Duration.ofSeconds(10))
106+
.untilAsserted(() -> assertThat(setup.received)
107+
.as("received - all")
108+
.contains("1", "2", "3", "4", "5", "6", "7", "8", "9"));
109+
assertThat(setup.processed)
110+
.as("processed all - not loosing 3, 4, 5")
111+
.contains("1", "2", "3", "4", "5", "6", "7", "8", "9");
112+
}
113+
114+
@Configuration
115+
@EnableKafka
116+
public static class Config {
117+
118+
@Autowired
119+
KafkaListenerEndpointRegistry registry;
120+
121+
@Autowired
122+
EmbeddedKafkaBroker embeddedKafkaBroker;
123+
124+
final Set<String> received = new LinkedHashSet<>();
125+
final Set<String> processed = new LinkedHashSet<>();
126+
127+
final AtomicBoolean failing = new AtomicBoolean(false);
128+
final AtomicBoolean triggerPause = new AtomicBoolean(false);
129+
130+
void resumeContainer() {
131+
log("resuming...");
132+
registry.getListenerContainer("id").resume(); //NOSONAR
133+
log("resumed");
134+
}
135+
136+
void pauseContainer() {
137+
log("pausing...");
138+
registry.getListenerContainer("id").pause(); //NOSONAR
139+
log("paused");
140+
}
141+
142+
void produce(int... records) {
143+
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
144+
try (Producer<Integer, String> producer = pf.createProducer()) {
145+
for (int record : records) {
146+
log("producing message: " + record);
147+
producer.send(new ProducerRecord<>("foo", record, Integer.toString(record)));
148+
}
149+
producer.flush();
150+
}
151+
}
152+
153+
154+
@KafkaListener(id = "id", groupId = "grp", topics = "foo")
155+
public void process(List<String> batch, Acknowledgment acknowledgment) {
156+
batch.forEach((msg) -> {
157+
if (!received.contains(msg)) {
158+
log("Got new message: " + msg);
159+
}
160+
received.add(msg);
161+
});
162+
received.addAll(batch);
163+
if (failing.get()) {
164+
throw new RuntimeException("ooops");
165+
}
166+
batch.forEach((msg) -> {
167+
if (!processed.contains(msg)) {
168+
log("Processed new message: " + msg);
169+
}
170+
processed.add(msg);
171+
});
172+
acknowledgment.acknowledge();
173+
}
174+
175+
/**
176+
* Call {@link #pauseContainer()} is timed during {@link KafkaMessageListenerContainer.ListenerConsumer#polling}
177+
* is being `true`, but after Consumer's check if it had been woken up.
178+
* Problem depends the fact that very next call {@link Consumer#poll(Duration)}
179+
* will throw {@link org.apache.kafka.common.errors.WakeupException}
180+
*/
181+
@SuppressWarnings({"rawtypes"})
182+
private Consumer makePausingAfterPollConsumer(Consumer delegate) {
183+
Consumer spied = spy(delegate);
184+
willAnswer((call) -> {
185+
Duration duration = call.getArgument(0, Duration.class);
186+
ConsumerRecords records = delegate.poll(duration);
187+
if (!duration.isZero() && triggerPause.get()) {
188+
pauseContainer();
189+
}
190+
return records;
191+
}).given(spied).poll(any());
192+
return spied;
193+
}
194+
195+
@SuppressWarnings({"rawtypes"})
196+
private ConsumerFactory makePausingAfterPollConsumerFactory(ConsumerFactory delegate) {
197+
ConsumerFactory spied = spy(delegate);
198+
willAnswer((invocation -> {
199+
Consumer consumerDelegate = delegate.createConsumer(
200+
invocation.getArgument(0, String.class),
201+
invocation.getArgument(1, String.class),
202+
invocation.getArgument(2, String.class),
203+
invocation.getArgument(3, Properties.class)
204+
);
205+
return makePausingAfterPollConsumer(consumerDelegate);
206+
})).given(spied).createConsumer(any(), any(), any(), any());
207+
return spied;
208+
}
209+
210+
@SuppressWarnings({"rawtypes", "unchecked"})
211+
@Bean
212+
ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
213+
DefaultKafkaConsumerFactory consumerFactory = new DefaultKafkaConsumerFactory(
214+
KafkaTestUtils.consumerProps("grp", "false", embeddedKafkaBroker)
215+
);
216+
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
217+
factory.setBatchListener(true);
218+
factory.setConsumerFactory(makePausingAfterPollConsumerFactory(consumerFactory));
219+
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
220+
factory.getContainerProperties().setPollTimeoutWhilePaused(Duration.ZERO);
221+
DefaultErrorHandler eh = new DefaultErrorHandler(new FixedBackOff(100, Long.MAX_VALUE));
222+
eh.setSeekAfterError(true);
223+
factory.setCommonErrorHandler(eh);
224+
return factory;
225+
}
226+
227+
}
228+
229+
}

0 commit comments

Comments
 (0)