Skip to content

Commit 6bd4694

Browse files
garyrussellartembilan
authored andcommitted
GH-859: Fix nested transactions
Resolves #859 When using `executeInTransaction` on a transactional container thread, we cannot use the existing transaction - clear the TL to allow a new Producer to be allocated. Invalid state transition (in-trans to in-trans). **cherry-pick to 2.1.x, 2.0.x, backport needed for 1.3.x** # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java
1 parent addfaca commit 6bd4694

File tree

4 files changed

+88
-1
lines changed

4 files changed

+88
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,16 @@ public void setProducerPerConsumerPartition(boolean producerPerConsumerPartition
154154
this.producerPerConsumerPartition = producerPerConsumerPartition;
155155
}
156156

157+
/**
158+
* Return the producerPerConsumerPartition.
159+
* @return the producerPerConsumerPartition.
160+
* @since 1.3.8
161+
*/
162+
@Override
163+
public boolean isProducerPerConsumerPartition() {
164+
return this.producerPerConsumerPartition;
165+
}
166+
157167
/**
158168
* Return an unmodifiable reference to the configuration map for this factory.
159169
* Useful for cloning to make a similar factory.
@@ -246,7 +256,7 @@ protected Producer<K, V> createKafkaProducer() {
246256
return new KafkaProducer<K, V>(this.configs, this.keySerializer, this.valueSerializer);
247257
}
248258

249-
private Producer<K, V> createTransactionalProducerForPartition() {
259+
Producer<K, V> createTransactionalProducerForPartition() {
250260
String suffix = TransactionSupport.getTransactionIdSuffix();
251261
if (suffix == null) {
252262
return createTransactionalProducer();

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.springframework.kafka.support.LoggingProducerListener;
3535
import org.springframework.kafka.support.ProducerListener;
3636
import org.springframework.kafka.support.SendResult;
37+
import org.springframework.kafka.support.TransactionSupport;
3738
import org.springframework.kafka.support.converter.MessageConverter;
3839
import org.springframework.kafka.support.converter.MessagingMessageConverter;
3940
import org.springframework.kafka.support.converter.RecordMessageConverter;
@@ -236,6 +237,15 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
236237
Assert.state(this.transactional, "Producer factory does not support transactions");
237238
Producer<K, V> producer = this.producers.get();
238239
Assert.state(producer == null, "Nested calls to 'executeInTransaction' are not allowed");
240+
String transactionIdSuffix;
241+
if (this.producerFactory.isProducerPerConsumerPartition()) {
242+
transactionIdSuffix = TransactionSupport.getTransactionIdSuffix();
243+
TransactionSupport.clearTransactionIdSuffix();
244+
}
245+
else {
246+
transactionIdSuffix = null;
247+
}
248+
239249
producer = this.producerFactory.createProducer();
240250

241251
try {
@@ -265,6 +275,9 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
265275
producer.commitTransaction();
266276
}
267277
finally {
278+
if (transactionIdSuffix != null) {
279+
TransactionSupport.setTransactionIdSuffix(transactionIdSuffix);
280+
}
268281
this.producers.remove();
269282
closeProducer(producer, false);
270283
}

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,13 @@ default void closeProducerFor(String transactionIdSuffix) {
5151
// NOSONAR
5252
}
5353

54+
/**
55+
* Return the producerPerConsumerPartition.
56+
* @return the producerPerConsumerPartition.
57+
* @since 1.3.8
58+
*/
59+
default boolean isProducerPerConsumerPartition() {
60+
return false;
61+
}
62+
5463
}

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@
2828
import static org.springframework.kafka.test.assertj.KafkaConditions.key;
2929
import static org.springframework.kafka.test.assertj.KafkaConditions.value;
3030

31+
import java.util.Collections;
3132
import java.util.Iterator;
3233
import java.util.Map;
3334
import java.util.concurrent.BlockingQueue;
35+
import java.util.concurrent.atomic.AtomicBoolean;
3436

3537
import org.apache.kafka.clients.consumer.Consumer;
3638
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -51,6 +53,7 @@
5153
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
5254
import org.springframework.context.annotation.Bean;
5355
import org.springframework.context.annotation.Configuration;
56+
import org.springframework.kafka.support.TransactionSupport;
5457
import org.springframework.kafka.support.transaction.ResourcelessTransactionManager;
5558
import org.springframework.kafka.test.rule.KafkaEmbedded;
5659
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -234,6 +237,58 @@ public void testTransactionSynchronizationExceptionOnCommit() {
234237
assertThat(producer.closed()).isTrue();
235238
}
236239

240+
@Test
241+
public void testExcecuteInTransactionNewInnerTx() {
242+
@SuppressWarnings("unchecked")
243+
Producer<Object, Object> producer1 = mock(Producer.class);
244+
@SuppressWarnings("unchecked")
245+
Producer<Object, Object> producer2 = mock(Producer.class);
246+
producer1.initTransactions();
247+
AtomicBoolean first = new AtomicBoolean(true);
248+
249+
DefaultKafkaProducerFactory<Object, Object> pf = new DefaultKafkaProducerFactory<Object, Object>(
250+
Collections.emptyMap()) {
251+
252+
@Override
253+
protected Producer<Object, Object> createTransactionalProducer() {
254+
return first.getAndSet(false) ? producer1 : producer2;
255+
}
256+
257+
@Override
258+
Producer<Object, Object> createTransactionalProducerForPartition() {
259+
return createTransactionalProducer();
260+
}
261+
262+
};
263+
pf.setTransactionIdPrefix("tx.");
264+
265+
KafkaTemplate<Object, Object> template = new KafkaTemplate<>(pf);
266+
template.setDefaultTopic(STRING_KEY_TOPIC);
267+
268+
KafkaTransactionManager<Object, Object> tm = new KafkaTransactionManager<>(pf);
269+
270+
try {
271+
TransactionSupport.setTransactionIdSuffix("testExcecuteInTransactionNewInnerTx");
272+
new TransactionTemplate(tm).execute(s -> {
273+
return template.executeInTransaction(t -> {
274+
template.sendDefault("foo", "bar");
275+
return null;
276+
});
277+
});
278+
279+
InOrder inOrder = inOrder(producer1, producer2);
280+
inOrder.verify(producer1).beginTransaction();
281+
inOrder.verify(producer2).beginTransaction();
282+
inOrder.verify(producer2).commitTransaction();
283+
inOrder.verify(producer2).close();
284+
inOrder.verify(producer1).commitTransaction();
285+
inOrder.verify(producer1).close();
286+
}
287+
finally {
288+
TransactionSupport.clearTransactionIdSuffix();
289+
}
290+
}
291+
237292
@Configuration
238293
@EnableTransactionManagement
239294
public static class DeclarativeConfig {

0 commit comments

Comments
 (0)