Skip to content

Commit 9997ffd

Browse files
garyrussellartembilan
authored andcommitted
Refactor KafkaAwareTransactionManager
Sub-interface of `PlatformTransactionManager` to facilitate Boot auto-configuration. Deprecate `ResourceTransactionManager` inheritance.
1 parent b3f3894 commit 9997ffd

File tree

4 files changed

+78
-5
lines changed

4 files changed

+78
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/transaction/ChainedKafkaTransactionManager.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,18 @@ public ProducerFactory<K, V> getProducerFactory() {
5555
return this.kafkaTransactionManager.getProducerFactory();
5656
}
5757

58+
/**
59+
* Return the producer factory.
60+
* @return the producer factory.
61+
* @deprecated - in a future release {@link KafkaAwareTransactionManager} will not be
62+
* a sub interface of
63+
* {@link org.springframework.transaction.support.ResourceTransactionManager}.
64+
* TODO: Remove in 3.0
65+
*/
66+
@Deprecated
67+
@Override
68+
public Object getResourceFactory() {
69+
return getProducerFactory();
70+
}
71+
5872
}

spring-kafka/src/main/java/org/springframework/kafka/transaction/KafkaAwareTransactionManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717
package org.springframework.kafka.transaction;
1818

1919
import org.springframework.kafka.core.ProducerFactory;
20+
import org.springframework.transaction.support.ResourceTransactionManager;
2021

2122
/**
2223
* A transaction manager that can provide a {@link ProducerFactory}.
24+
* Currently a sub-interface of {@link ResourceTransactionManager}
25+
* for backwards compatibility.
2326
*
2427
* @param <K> the key type.
2528
* @param <V> the value type.
@@ -28,7 +31,7 @@
2831
* @since 2.1.3
2932
*
3033
*/
31-
public interface KafkaAwareTransactionManager<K, V> {
34+
public interface KafkaAwareTransactionManager<K, V> extends ResourceTransactionManager {
3235

3336
/**
3437
* Get the producer factory.

spring-kafka/src/main/java/org/springframework/kafka/transaction/KafkaTransactionManager.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@
6666
*/
6767
@SuppressWarnings("serial")
6868
public class KafkaTransactionManager<K, V> extends AbstractPlatformTransactionManager
69-
implements ResourceTransactionManager, KafkaAwareTransactionManager<K, V> {
69+
implements KafkaAwareTransactionManager<K, V> {
7070

7171
private final ProducerFactory<K, V> producerFactory;
7272

@@ -93,6 +93,14 @@ public ProducerFactory<K, V> getProducerFactory() {
9393
return this.producerFactory;
9494
}
9595

96+
/**
97+
* Return the producer factory.
98+
* @return the producer factory.
99+
* @deprecated - in a future release {@link KafkaAwareTransactionManager} will
100+
* not be a sub interface of {@link ResourceTransactionManager}.
101+
* TODO: Remove in 3.0
102+
*/
103+
@Deprecated
96104
@Override
97105
public Object getResourceFactory() {
98106
return getProducerFactory();

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,11 @@
4747
import org.junit.runner.RunWith;
4848
import org.mockito.Mockito;
4949

50+
import org.springframework.beans.factory.ObjectProvider;
5051
import org.springframework.beans.factory.annotation.Autowired;
5152
import org.springframework.context.annotation.Bean;
5253
import org.springframework.context.annotation.Configuration;
54+
import org.springframework.context.annotation.Primary;
5355
import org.springframework.context.event.EventListener;
5456
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
5557
import org.springframework.core.convert.converter.Converter;
@@ -88,6 +90,9 @@
8890
import org.springframework.kafka.test.EmbeddedKafkaBroker;
8991
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
9092
import org.springframework.kafka.test.utils.KafkaTestUtils;
93+
import org.springframework.kafka.transaction.ChainedKafkaTransactionManager;
94+
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
95+
import org.springframework.kafka.transaction.KafkaTransactionManager;
9196
import org.springframework.lang.NonNull;
9297
import org.springframework.messaging.Message;
9398
import org.springframework.messaging.MessageHeaders;
@@ -173,6 +178,9 @@ public class EnableKafkaIntegrationTests {
173178
@Autowired
174179
private FooConverter fooConverter;
175180

181+
@Autowired
182+
private ConcurrentKafkaListenerContainerFactory<Integer, String> transactionalFactory;
183+
176184
@Test
177185
public void testAnonymous() {
178186
MessageListenerContainer container = this.registry
@@ -669,6 +677,12 @@ public void testReceivePollResults() throws Exception {
669677
assertThat(this.listener.consumerRecords.iterator().next().value()).isEqualTo("allRecords");
670678
}
671679

680+
@Test
681+
public void testAutoConfigTm() {
682+
assertThat(this.transactionalFactory.getContainerProperties().getTransactionManager())
683+
.isInstanceOf(ChainedKafkaTransactionManager.class);
684+
}
685+
672686
@Configuration
673687
@EnableKafka
674688
@EnableTransactionManagement(proxyTargetClass = true)
@@ -686,11 +700,23 @@ public PlatformTransactionManager transactionManager() {
686700
return Mockito.mock(PlatformTransactionManager.class);
687701
}
688702

703+
@Bean
704+
public KafkaTransactionManager<Integer, String> ktm() {
705+
return new KafkaTransactionManager<>(txProducerFactory());
706+
}
707+
708+
@Bean
709+
@Primary
710+
public ChainedKafkaTransactionManager<Integer, String> cktm() {
711+
return new ChainedKafkaTransactionManager<>(ktm(), transactionManager());
712+
}
713+
689714
private Throwable globalErrorThrowable;
690715

691716
@Bean
692717
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
693-
kafkaListenerContainerFactory() {
718+
kafkaListenerContainerFactory() {
719+
694720
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
695721
new ConcurrentKafkaListenerContainerFactory<>();
696722
factory.setConsumerFactory(consumerFactory());
@@ -705,10 +731,25 @@ public PlatformTransactionManager transactionManager() {
705731

706732
@Bean
707733
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
708-
withNoReplyTemplateContainerFactory() {
734+
withNoReplyTemplateContainerFactory() {
735+
736+
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
737+
new ConcurrentKafkaListenerContainerFactory<>();
738+
factory.setConsumerFactory(consumerFactory());
739+
return factory;
740+
}
741+
742+
@Bean
743+
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
744+
transactionalFactory(ObjectProvider<KafkaAwareTransactionManager<Integer, String>> tm) {
745+
709746
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
710747
new ConcurrentKafkaListenerContainerFactory<>();
711748
factory.setConsumerFactory(consumerFactory());
749+
KafkaAwareTransactionManager<Integer, String> ktm = tm.getIfUnique();
750+
if (ktm != null) {
751+
factory.getContainerProperties().setTransactionManager(ktm);
752+
}
712753
return factory;
713754
}
714755

@@ -926,6 +967,13 @@ public ProducerFactory<Integer, String> producerFactory() {
926967
return new DefaultKafkaProducerFactory<>(producerConfigs());
927968
}
928969

970+
@Bean
971+
public ProducerFactory<Integer, String> txProducerFactory() {
972+
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(producerConfigs());
973+
pf.setTransactionIdPrefix("tx-");
974+
return pf;
975+
}
976+
929977
@Bean
930978
public Map<String, Object> producerConfigs() {
931979
return KafkaTestUtils.producerProps(embeddedKafka);
@@ -1481,7 +1529,7 @@ public void listen(String foo) {
14811529
}
14821530

14831531
@KafkaListener(id = "ifctx", topics = "annotated9")
1484-
@Transactional
1532+
@Transactional(transactionManager = "transactionManager")
14851533
public void listenTx(String foo) {
14861534
latch2.countDown();
14871535
}

0 commit comments

Comments
 (0)