Skip to content

Commit 60f92c8

Browse files
garyrussellartembilan
authored andcommitted
Add ChainedKafkaTransactionManager
Enable transaction synchronization without the need for user code to send the offset(s) to the transaction.
1 parent b25e144 commit 60f92c8

File tree

7 files changed

+137
-11
lines changed

7 files changed

+137
-11
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@
6262
import org.springframework.kafka.support.LogIfLevelEnabled;
6363
import org.springframework.kafka.support.TopicPartitionInitialOffset;
6464
import org.springframework.kafka.support.TopicPartitionInitialOffset.SeekPosition;
65-
import org.springframework.kafka.transaction.KafkaTransactionManager;
65+
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
6666
import org.springframework.scheduling.SchedulingAwareRunnable;
6767
import org.springframework.scheduling.TaskScheduler;
6868
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@@ -361,9 +361,9 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
361361
private final PlatformTransactionManager transactionManager = this.containerProperties.getTransactionManager();
362362

363363
@SuppressWarnings("rawtypes")
364-
private final KafkaTransactionManager kafkaTxManager =
365-
this.transactionManager instanceof KafkaTransactionManager
366-
? ((KafkaTransactionManager) this.transactionManager) : null;
364+
private final KafkaAwareTransactionManager kafkaTxManager =
365+
this.transactionManager instanceof KafkaAwareTransactionManager
366+
? ((KafkaAwareTransactionManager) this.transactionManager) : null;
367367

368368
private final TransactionTemplate transactionTemplate;
369369

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.transaction;
18+
19+
import org.springframework.data.transaction.ChainedTransactionManager;
20+
import org.springframework.kafka.core.ProducerFactory;
21+
import org.springframework.transaction.PlatformTransactionManager;
22+
import org.springframework.util.Assert;
23+
24+
/**
25+
* A {@link ChainedTransactionManager} that has exactly one
26+
* {@link KafkaAwareTransactionManager} in the chain.
27+
*
28+
* @param <K> the key type.
29+
* @param <V> the value type.
30+
*
31+
* @author Gary Russell
32+
* @since 2.1.3
33+
*
34+
*/
35+
public class ChainedKafkaTransactionManager<K, V> extends ChainedTransactionManager implements KafkaAwareTransactionManager<K, V> {
36+
37+
private final KafkaAwareTransactionManager<K, V> kafkaTransactionManager;
38+
39+
@SuppressWarnings("unchecked")
40+
public ChainedKafkaTransactionManager(PlatformTransactionManager... transactionManagers) {
41+
super(transactionManagers);
42+
KafkaAwareTransactionManager<K, V> kafkaTransactionManager = null;
43+
for (PlatformTransactionManager tm : transactionManagers) {
44+
if (tm instanceof KafkaAwareTransactionManager) {
45+
Assert.isNull(kafkaTransactionManager, "Only one KafkaAwareTransactionManager is allowed");
46+
kafkaTransactionManager = (KafkaTransactionManager<K, V>) tm;
47+
}
48+
}
49+
Assert.notNull(kafkaTransactionManager, "Exactly one KafkaAwareTransactionManager is required");
50+
this.kafkaTransactionManager = kafkaTransactionManager;
51+
}
52+
53+
@Override
54+
public ProducerFactory<K, V> getProducerFactory() {
55+
return this.kafkaTransactionManager.getProducerFactory();
56+
}
57+
58+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.transaction;
18+
19+
import org.springframework.kafka.core.ProducerFactory;
20+
21+
/**
22+
* A transaction manager that can provide a {@link ProducerFactory}.
23+
*
24+
* @param <K> the key type.
25+
* @param <V> the value type.
26+
*
27+
* @author Gary Russell
28+
* @since 2.1.3
29+
*
30+
*/
31+
public interface KafkaAwareTransactionManager<K, V> {
32+
33+
/**
34+
* Get the producer factory.
35+
* @return the producerFactory
36+
*/
37+
ProducerFactory<K, V> getProducerFactory();
38+
39+
}

spring-kafka/src/main/java/org/springframework/kafka/transaction/KafkaTransactionManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -66,7 +66,7 @@
6666
*/
6767
@SuppressWarnings("serial")
6868
public class KafkaTransactionManager<K, V> extends AbstractPlatformTransactionManager
69-
implements ResourceTransactionManager {
69+
implements ResourceTransactionManager, KafkaAwareTransactionManager<K, V> {
7070

7171
private final ProducerFactory<K, V> producerFactory;
7272

@@ -88,6 +88,7 @@ public KafkaTransactionManager(ProducerFactory<K, V> producerFactory) {
8888
* Get the producer factory.
8989
* @return the producerFactory
9090
*/
91+
@Override
9192
public ProducerFactory<K, V> getProducerFactory() {
9293
return this.producerFactory;
9394
}

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -64,7 +64,9 @@
6464
import org.springframework.kafka.listener.config.ContainerProperties;
6565
import org.springframework.kafka.test.rule.KafkaEmbedded;
6666
import org.springframework.kafka.test.utils.KafkaTestUtils;
67+
import org.springframework.kafka.transaction.ChainedKafkaTransactionManager;
6768
import org.springframework.kafka.transaction.KafkaTransactionManager;
69+
import org.springframework.transaction.PlatformTransactionManager;
6870
import org.springframework.transaction.TransactionDefinition;
6971
import org.springframework.transaction.TransactionException;
7072
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
@@ -88,9 +90,18 @@ public class TransactionalContainerTests {
8890
@ClassRule
8991
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(3, true, topic1, topic2);
9092

91-
@SuppressWarnings({ "rawtypes", "unchecked" })
9293
@Test
93-
public void testConsumeAndProduceTransaction() throws Exception {
94+
public void testConsumeAndProduceTransactionKTM() throws Exception {
95+
testConsumeAndProduceTransactionGuts(false);
96+
}
97+
98+
@Test
99+
public void testConsumeAndProduceTransactionKCTM() throws Exception {
100+
testConsumeAndProduceTransactionGuts(true);
101+
}
102+
103+
@SuppressWarnings({ "rawtypes", "unchecked" })
104+
private void testConsumeAndProduceTransactionGuts(boolean chained) throws Exception {
94105
Consumer consumer = mock(Consumer.class);
95106
final TopicPartition topicPartition = new TopicPartition("foo", 0);
96107
willAnswer(i -> {
@@ -122,9 +133,13 @@ public void testConsumeAndProduceTransaction() throws Exception {
122133
given(pf.transactionCapable()).willReturn(true);
123134
given(pf.createProducer()).willReturn(producer);
124135
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
136+
PlatformTransactionManager ptm = tm;
137+
if (chained) {
138+
ptm = new ChainedKafkaTransactionManager(new SomeOtherTransactionManager(), tm);
139+
}
125140
ContainerProperties props = new ContainerProperties("foo");
126141
props.setGroupId("group");
127-
props.setTransactionManager(tm);
142+
props.setTransactionManager(ptm);
128143
final KafkaTemplate template = new KafkaTemplate(pf);
129144
props.setMessageListener((MessageListener) m -> {
130145
template.send("bar", "baz");

src/reference/asciidoc/kafka.adoc

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ If the listener throws an exception, the transaction is rolled back and the cons
280280
If you need to synchronize a Kafka transaction with some other transaction; simply configure the listener container with the appropriate transaction manager (one that supports synchronization, such as the `DataSourceTransactionManager`).
281281
Any operations performed on a **transactional** `KafkaTemplate` from the listener will participate in a single transaction.
282282
The Kafka transaction will be committed (or rolled back) immediately after the controlling transaction.
283-
Before exiting the listener, you should invoke one of the template's `sendOffsetsToTransaction` methods.
283+
Before exiting the listener, you should invoke one of the template's `sendOffsetsToTransaction` methods (unless you use a <<chained-transaction-manager, `ChainedKafkaTransactionManager` - see below>>).
284284
For convenience, the listener container binds its consumer group id to the thread so, generally, you can use the first method:
285285

286286
[source, java]
@@ -317,6 +317,15 @@ NOTE: The offset to be committed is one greater than the offset of the record(s)
317317
IMPORTANT: This should only be called when using transaction synchronization.
318318
When a listener container is configured to use a `KafkaTransactionManager`, it will take care of sending the offsets to the transaction.
319319

320+
[[chained-transaction-manager]]
321+
====== ChainedKafkaTransactionManager
322+
323+
The `ChainedKafkaTransactionManager` was introduced in _version 2.1.3_.
324+
This is a subclass of `ChainedTransactionManager` that can have exactly one `KafkaTransactionManager`.
325+
Since it is a `KafkaAwareTransactionManager`, the container can send the offsets to the transaction in the same way as when the container is configured with a simple `KafkaTransactionManager`.
326+
This provides another mechanism for synchronizing transactions without having to send the offsets to the transaction in the listener code.
327+
Chain your transaction managers in the desired order and provide the `ChainedTransactionManager` in the `ContainerProperties`.
328+
320329
====== KafkaTemplate Local Transactions
321330

322331
You can use the `KafkaTemplate` to execute a series of operations within a local transaction.

src/reference/asciidoc/whats-new.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ See <<class-level-kafkalistener>> for more information.
4242
Starting with _version 2.1.3_, a subclass of `KafkaTemplate` is provided to support request/reply semantics.
4343
See <<replying-template>> for more information.
4444

45+
==== ChainedKafkaTransactionManager
46+
47+
_version 2.1.3_ introduced the `ChainedKafkaTransactionManager` see <<chained-transaction-manager>> for more information.
48+
4549
==== Migration Guide from 2.0
4650

4751
https://github.com/spring-projects/spring-kafka/wiki/Spring-for-Apache-Kafka-2.0-to-2.1-Migration-Guide[2.0 to 2.1 Migration].

0 commit comments

Comments
 (0)