Skip to content

Commit 7fba1a3

Browse files
garyrussellartembilan
authored andcommitted
GH-2722: Only Consider interceptBeforeTx with TX
Resolves #2722 Prior to 2.8.x, `interceptBeforeTx` was default `false`; it is now `true`. However, it should only be considered if there is a `TransactionManager` present. This allows an interceptor (when no transactions) to throw an exception, causing error handling to be invoked. Also tested with user's reproducer. **cherry-pick to 2.9.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java # spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java
1 parent 86f94c5 commit 7fba1a3

File tree

3 files changed

+13
-8
lines changed

3 files changed

+13
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchInterceptor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,6 +37,8 @@ public interface BatchInterceptor<K, V> extends ThreadStateProcessor {
3737
/**
3838
* Perform some action on the records or return a different one. If null is returned
3939
* the records will be skipped. Invoked before the listener.
40+
* IMPORTANT: If transactions are being used, and this method throws an exception, it
41+
* cannot be used with the container's {@code interceptBeforeTx} property set to true.
4042
* @param records the records.
4143
* @param consumer the consumer.
4244
* @return the records or null.

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -673,24 +673,24 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
673673
private final Duration syncCommitTimeout;
674674

675675
private final RecordInterceptor<K, V> recordInterceptor =
676-
!isInterceptBeforeTx() && this.transactionManager != null
676+
!isInterceptBeforeTx() || this.transactionManager == null
677677
? getRecordInterceptor()
678678
: null;
679679

680680
private final RecordInterceptor<K, V> earlyRecordInterceptor =
681-
isInterceptBeforeTx() || this.transactionManager == null
681+
isInterceptBeforeTx() && this.transactionManager != null
682682
? getRecordInterceptor()
683683
: null;
684684

685685
private final RecordInterceptor<K, V> commonRecordInterceptor = getRecordInterceptor();
686686

687687
private final BatchInterceptor<K, V> batchInterceptor =
688-
!isInterceptBeforeTx() && this.transactionManager != null
688+
!isInterceptBeforeTx() || this.transactionManager == null
689689
? getBatchInterceptor()
690690
: null;
691691

692692
private final BatchInterceptor<K, V> earlyBatchInterceptor =
693-
isInterceptBeforeTx() || this.transactionManager == null
693+
isInterceptBeforeTx() && this.transactionManager != null
694694
? getBatchInterceptor()
695695
: null;
696696

spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,8 +38,11 @@ public interface RecordInterceptor<K, V> extends ThreadStateProcessor {
3838
/**
3939
* Perform some action on the record or return a different one. If null is returned
4040
* the record will be skipped. Invoked before the listener. IMPORTANT; if this method
41-
* returns a different record, the topic, partition and offset must not be changed
42-
* to avoid undesirable side-effects.
41+
* returns a different record, the topic, partition and offset must not be changed to
42+
* avoid undesirable side-effects.
43+
* <p>
44+
* IMPORTANT: If transactions are being used, and this method throws an exception, it
45+
* cannot be used with the container's {@code interceptBeforeTx} property set to true.
4346
* @param record the record.
4447
* @return the record or null.
4548
* @deprecated in favor of {@link #intercept(ConsumerRecord, Consumer)} which will

0 commit comments

Comments
 (0)