Skip to content

Commit e16ae4f

Browse files
garyrussellartembilan
authored andcommitted
SeekToCurrentErrorHandler and Recovery
Don't throw an exception if the record was skipped by invoking the recoverer. - Adds noise to the log since the record was recovered in some way.
1 parent 9997ffd commit e16ae4f

File tree

3 files changed

+47
-5
lines changed

3 files changed

+47
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,9 @@ public SeekToCurrentErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exce
7878
@Override
7979
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records,
8080
Consumer<?, ?> consumer, MessageListenerContainer container) {
81-
SeekUtils.doSeeks(records, consumer, thrownException, true, this.failureTracker::skip, logger);
82-
throw new KafkaException("Seek to current after exception", thrownException);
81+
if (!SeekUtils.doSeeks(records, consumer, thrownException, true, this.failureTracker::skip, logger)) {
82+
throw new KafkaException("Seek to current after exception", thrownException);
83+
}
8384
}
8485

8586
@Override

spring-kafka/src/main/java/org/springframework/kafka/support/SeekUtils.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,25 +53,39 @@ private SeekUtils() {
5353
* @param recoverable true if skipping the first record is allowed.
5454
* @param skipper function to determine whether or not to skip seeking the first.
5555
* @param logger a {@link Log} for seek errors.
56+
* @return true if the failed record was skipped.
5657
*/
57-
public static void doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, Exception exception,
58+
public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, Exception exception,
5859
boolean recoverable, BiPredicate<ConsumerRecord<?, ?>, Exception> skipper, Log logger) {
5960
Map<TopicPartition, Long> partitions = new LinkedHashMap<>();
6061
AtomicBoolean first = new AtomicBoolean(true);
62+
AtomicBoolean skipped = new AtomicBoolean();
6163
records.forEach(record -> {
62-
if (!recoverable || !first.get() || !skipper.test(record, exception)) {
63-
partitions.computeIfAbsent(new TopicPartition(record.topic(), record.partition()), offset -> record.offset());
64+
if (recoverable && first.get()) {
65+
skipped.set(skipper.test(record, exception));
66+
if (skipped.get() && logger.isDebugEnabled()) {
67+
logger.debug("Skipping seek of: " + record);
68+
}
69+
}
70+
if (!recoverable || !first.get() || !skipped.get()) {
71+
partitions.computeIfAbsent(new TopicPartition(record.topic(), record.partition()),
72+
offset -> record.offset());
6473
}
6574
first.set(false);
6675
});
76+
boolean tracing = logger.isTraceEnabled();
6777
partitions.forEach((topicPartition, offset) -> {
6878
try {
79+
if (tracing) {
80+
logger.trace("Seeking: " + topicPartition + " to: " + offset);
81+
}
6982
consumer.seek(topicPartition, offset);
7083
}
7184
catch (Exception e) {
7285
logger.error("Failed to seek " + topicPartition + " to " + offset, e);
7386
}
7487
});
88+
return skipped.get();
7589
}
7690

7791
}

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentRecovererTests.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,14 @@
1717
package org.springframework.kafka.listener;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.fail;
21+
import static org.mockito.Mockito.mock;
2022
import static org.mockito.Mockito.spy;
2123
import static org.mockito.Mockito.verify;
24+
import static org.mockito.Mockito.verifyNoMoreInteractions;
2225

26+
import java.util.ArrayList;
27+
import java.util.List;
2328
import java.util.Map;
2429
import java.util.concurrent.CountDownLatch;
2530
import java.util.concurrent.TimeUnit;
@@ -33,6 +38,7 @@
3338
import org.junit.ClassRule;
3439
import org.junit.Test;
3540

41+
import org.springframework.kafka.KafkaException;
3642
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
3743
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
3844
import org.springframework.kafka.core.KafkaTemplate;
@@ -121,4 +127,25 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
121127
verify(errorHandler).clearThreadState();
122128
}
123129

130+
@Test
131+
public void seekToCurrentErrorHandlerRecovers() {
132+
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler((r, e) -> { }, 2);
133+
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
134+
records.add(new ConsumerRecord<>("foo", 0, 0, null, "foo"));
135+
records.add(new ConsumerRecord<>("foo", 0, 1, null, "bar"));
136+
Consumer<?, ?> consumer = mock(Consumer.class);
137+
try {
138+
eh.handle(new RuntimeException(), records, consumer, null);
139+
fail("Expected exception");
140+
}
141+
catch (KafkaException e) {
142+
// NOSONAR
143+
}
144+
verify(consumer).seek(new TopicPartition("foo", 0), 0L);
145+
verifyNoMoreInteractions(consumer);
146+
eh.handle(new RuntimeException(), records, consumer, null);
147+
verify(consumer).seek(new TopicPartition("foo", 0), 1L);
148+
verifyNoMoreInteractions(consumer);
149+
}
150+
124151
}

0 commit comments

Comments
 (0)