Skip to content

Commit a8d8497

Browse files
committed
GH-1591: Apply BackOff in DARP with Batch Listener
Resolves #1591 `BackOff` was ignored for batch listeners (not recoverable). - move code from `SeekToCurrentBatchErrorHandler` to `ListenerUtils` - call from both STCEH and DARP - also clear the thread state (in both) if a batch fails and subsequently succeeds **I will do the backports - conflicts are expected**
1 parent 743d7d8 commit a8d8497

File tree

8 files changed

+120
-29
lines changed

8 files changed

+120
-29
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.springframework.kafka.support.SeekUtils;
3030
import org.springframework.lang.Nullable;
3131
import org.springframework.util.backoff.BackOff;
32+
import org.springframework.util.backoff.BackOffExecution;
3233
import org.springframework.util.backoff.FixedBackOff;
3334

3435
/**
@@ -49,6 +50,12 @@
4950
*/
5051
public class DefaultAfterRollbackProcessor<K, V> extends FailedRecordProcessor implements AfterRollbackProcessor<K, V> {
5152

53+
private final ThreadLocal<BackOffExecution> backOffs = new ThreadLocal<>(); // Intentionally not static
54+
55+
private final ThreadLocal<Long> lastIntervals = new ThreadLocal<>(); // Intentionally not static
56+
57+
private final BackOff backOff;
58+
5259
private KafkaTemplate<K, V> kafkaTemplate;
5360

5461
/**
@@ -119,6 +126,7 @@ public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>,
119126

120127
// Remove super CTOR when this is removed.
121128
super(recoverer, maxFailures);
129+
this.backOff = maxFailuresToBackOff(maxFailures);
122130
}
123131

124132
/**
@@ -132,6 +140,7 @@ public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>,
132140
BackOff backOff) {
133141

134142
super(recoverer, backOff);
143+
this.backOff = backOff;
135144
}
136145

137146
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -147,6 +156,10 @@ && isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTra
147156
Collections.singletonMap(new TopicPartition(skipped.topic(), skipped.partition()),
148157
new OffsetAndMetadata(skipped.offset() + 1)));
149158
}
159+
160+
if (!recoverable && this.backOff != null) {
161+
ListenerUtils.unrecoverableBackOff(this.backOff, this.backOffs, this.lastIntervals);
162+
}
150163
}
151164

152165
@Override
@@ -196,4 +209,11 @@ public void setKafkaTemplate(KafkaTemplate<K, V> kafkaTemplate) {
196209
this.kafkaTemplate = kafkaTemplate;
197210
}
198211

212+
@Override
213+
public void clearThreadState() {
214+
super.clearThreadState();
215+
this.backOffs.remove();
216+
this.lastIntervals.remove();
217+
}
218+
199219
}

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -71,7 +71,7 @@ protected FailedRecordProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>, Excep
7171
this.classifier = configureDefaultClassifier();
7272
}
7373

74-
private static FixedBackOff maxFailuresToBackOff(int maxFailures) {
74+
protected static FixedBackOff maxFailuresToBackOff(int maxFailures) {
7575
if (maxFailures < 0) {
7676
return new FixedBackOff(0L, FixedBackOff.UNLIMITED_ATTEMPTS);
7777
}

spring-kafka/src/main/java/org/springframework/kafka/listener/GenericErrorHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ default void handle(Exception thrownException, T data, Consumer<?, ?> consumer)
5353
* @since 2.3
5454
*/
5555
default void clearThreadState() {
56-
// NOSONAR
5756
}
5857

5958
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
603603

604604
private Producer<?, ?> producer;
605605

606+
private boolean batchFailed;
607+
606608
private volatile boolean consumerPaused;
607609

608610
private volatile Collection<TopicPartition> assignedPartitions;
@@ -1465,6 +1467,11 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
14651467
try {
14661468
invokeBatchOnMessage(records, recordList, producer);
14671469
successTimer(sample);
1470+
if (this.batchFailed) {
1471+
this.batchFailed = false;
1472+
this.batchErrorHandler.clearThreadState();
1473+
getAfterRollbackProcessor().clearThreadState();
1474+
}
14681475
}
14691476
catch (RuntimeException e) {
14701477
failureTimer(sample);
@@ -1476,6 +1483,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
14761483
throw e;
14771484
}
14781485
try {
1486+
this.batchFailed = true;
14791487
invokeBatchErrorHandler(records, recordList, e);
14801488
// unlikely, but possible, that a batch error handler "handles" the error
14811489
if ((!acked && !this.autoCommit && this.batchErrorHandler.isAckAfterHandle()) || producer != null) {

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;
3333
import org.springframework.lang.Nullable;
3434
import org.springframework.util.Assert;
35+
import org.springframework.util.backoff.BackOff;
36+
import org.springframework.util.backoff.BackOffExecution;
3537

3638
/**
3739
* Listener utilities.
@@ -131,4 +133,40 @@ public static String recordToString(ConsumerRecord<?, ?> record) {
131133
}
132134
}
133135

136+
/**
137+
* Sleep according to the {@link BackOff}; when the {@link BackOffExecution} returns
138+
* {@link BackOffExecution#STOP} sleep for the previous backOff.
139+
* @param backOff the {@link BackOff} to create a new {@link BackOffExecution}.
140+
* @param executions a thread local containing the {@link BackOffExecution} for this
141+
* thread.
142+
* @param lastIntervals a thread local containing the previous {@link BackOff}
143+
* interval for this thread.
144+
* @since 2.3.12
145+
*/
146+
public static void unrecoverableBackOff(BackOff backOff, ThreadLocal<BackOffExecution> executions,
147+
ThreadLocal<Long> lastIntervals) {
148+
149+
BackOffExecution backOffExecution = executions.get();
150+
if (backOffExecution == null) {
151+
backOffExecution = backOff.start();
152+
executions.set(backOffExecution);
153+
}
154+
Long interval = backOffExecution.nextBackOff();
155+
if (interval == BackOffExecution.STOP) {
156+
interval = lastIntervals.get();
157+
if (interval == null) {
158+
interval = Long.valueOf(0);
159+
}
160+
}
161+
lastIntervals.set(interval);
162+
if (interval > 0) {
163+
try {
164+
Thread.sleep(interval);
165+
}
166+
catch (@SuppressWarnings("unused") InterruptedException e) {
167+
Thread.currentThread().interrupt();
168+
}
169+
}
170+
}
171+
134172
}

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandler.java

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class SeekToCurrentBatchErrorHandler implements ContainerAwareBatchErrorH
3838

3939
private final ThreadLocal<BackOffExecution> backOffs = new ThreadLocal<>(); // Intentionally not static
4040

41-
private final ThreadLocal<Long> lastInterval = new ThreadLocal<>(); // Intentionally not static
41+
private final ThreadLocal<Long> lastIntervals = new ThreadLocal<>(); // Intentionally not static
4242

4343
private BackOff backOff;
4444

@@ -65,27 +65,7 @@ public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consum
6565
.forEach(consumer::seek);
6666

6767
if (this.backOff != null) {
68-
BackOffExecution backOffExecution = this.backOffs.get();
69-
if (backOffExecution == null) {
70-
backOffExecution = this.backOff.start();
71-
this.backOffs.set(backOffExecution);
72-
}
73-
Long interval = backOffExecution.nextBackOff();
74-
if (interval == BackOffExecution.STOP) {
75-
interval = this.lastInterval.get();
76-
if (interval == null) {
77-
interval = Long.valueOf(0);
78-
}
79-
}
80-
this.lastInterval.set(interval);
81-
if (interval > 0) {
82-
try {
83-
Thread.sleep(interval);
84-
}
85-
catch (@SuppressWarnings("unused") InterruptedException e) {
86-
Thread.currentThread().interrupt();
87-
}
88-
}
68+
ListenerUtils.unrecoverableBackOff(this.backOff, this.backOffs, this.lastIntervals);
8969
}
9070

9171
throw new KafkaException("Seek to current after exception", thrownException);
@@ -94,7 +74,7 @@ public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consum
9474
@Override
9575
public void clearThreadState() {
9676
this.backOffs.remove();
97-
this.lastInterval.remove();
77+
this.lastIntervals.remove();
9878
}
9979

10080
}

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@
1717
package org.springframework.kafka.listener;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
2021
import static org.mockito.ArgumentMatchers.anyMap;
2122
import static org.mockito.BDDMockito.given;
23+
import static org.mockito.BDDMockito.willAnswer;
2224
import static org.mockito.Mockito.inOrder;
2325
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.never;
27+
import static org.mockito.Mockito.spy;
2428
import static org.mockito.Mockito.times;
2529
import static org.mockito.Mockito.verify;
2630

@@ -37,6 +41,9 @@
3741

3842
import org.springframework.kafka.core.KafkaTemplate;
3943
import org.springframework.kafka.support.serializer.DeserializationException;
44+
import org.springframework.util.backoff.BackOff;
45+
import org.springframework.util.backoff.BackOffExecution;
46+
import org.springframework.util.backoff.FixedBackOff;
4047

4148
/**
4249
* @author Gary Russell
@@ -46,7 +53,7 @@
4653
public class DefaultAfterRollbackProcessorTests {
4754

4855
@Test
49-
public void testClassifier() {
56+
void testClassifier() {
5057
AtomicReference<ConsumerRecord<?, ?>> recovered = new AtomicReference<>();
5158
AtomicBoolean recovererShouldFail = new AtomicBoolean(false);
5259
DefaultAfterRollbackProcessor<String, String> processor = new DefaultAfterRollbackProcessor<>((r, t) -> {
@@ -86,4 +93,33 @@ public void testClassifier() {
8693
inOrder.verifyNoMoreInteractions();
8794
}
8895

96+
@Test
97+
void testBatchBackOff() {
98+
AtomicReference<ConsumerRecord<?, ?>> recovered = new AtomicReference<>();
99+
BackOff backOff = spy(new FixedBackOff(0, 1));
100+
AtomicReference<BackOffExecution> execution = new AtomicReference<>();
101+
willAnswer(inv -> {
102+
BackOffExecution exec = spy((BackOffExecution) inv.callRealMethod());
103+
execution.set(exec);
104+
return exec;
105+
}).given(backOff).start();
106+
ConsumerRecordRecoverer recoverer = mock(ConsumerRecordRecoverer.class);
107+
DefaultAfterRollbackProcessor<String, String> processor = new DefaultAfterRollbackProcessor<>(recoverer,
108+
backOff);
109+
ConsumerRecord<String, String> record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar");
110+
ConsumerRecord<String, String> record2 = new ConsumerRecord<>("foo", 1, 1L, "foo", "bar");
111+
List<ConsumerRecord<String, String>> records = Arrays.asList(record1, record2);
112+
IllegalStateException illegalState = new IllegalStateException();
113+
@SuppressWarnings("unchecked")
114+
Consumer<String, String> consumer = mock(Consumer.class);
115+
processor.process(records, consumer, illegalState, false);
116+
processor.process(records, consumer, illegalState, false);
117+
verify(backOff, times(2)).start();
118+
verify(execution.get(), times(2)).nextBackOff();
119+
processor.clearThreadState();
120+
processor.process(records, consumer, illegalState, false);
121+
verify(backOff, times(3)).start();
122+
verify(recoverer, never()).accept(any(), any());
123+
}
124+
89125
}

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandlerTests.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import static org.mockito.BDDMockito.willAnswer;
2727
import static org.mockito.Mockito.inOrder;
2828
import static org.mockito.Mockito.mock;
29+
import static org.mockito.Mockito.spy;
30+
import static org.mockito.Mockito.verify;
2931

3032
import java.time.Duration;
3133
import java.util.Arrays;
@@ -65,6 +67,7 @@
6567
import org.springframework.kafka.transaction.KafkaTransactionManager;
6668
import org.springframework.test.annotation.DirtiesContext;
6769
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
70+
import org.springframework.util.backoff.BackOff;
6871
import org.springframework.util.backoff.FixedBackOff;
6972

7073
/**
@@ -133,10 +136,17 @@ void testBackOff() {
133136
long t1 = System.currentTimeMillis();
134137
for (int i = 0; i < 10; i++) {
135138
assertThatThrownBy(() -> eh.handle(ex, crs, mock(Consumer.class), mock(MessageListenerContainer.class)))
136-
.isInstanceOf(KafkaException.class)
137-
.hasCause(ex);
139+
.isInstanceOf(KafkaException.class)
140+
.hasCause(ex);
138141
}
139142
assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(100L);
143+
eh.clearThreadState();
144+
BackOff backOff = spy(new FixedBackOff(0L, 0L));
145+
eh.setBackOff(backOff);
146+
assertThatThrownBy(() -> eh.handle(ex, crs, mock(Consumer.class), mock(MessageListenerContainer.class)))
147+
.isInstanceOf(KafkaException.class)
148+
.hasCause(ex);
149+
verify(backOff).start();
140150
}
141151

142152
@SuppressWarnings({ "unchecked", "rawtypes" })

0 commit comments

Comments
 (0)