Skip to content

Commit bbd2b99

Browse files
garyrussellartembilan
authored andcommitted
GH-834: Remove transactional producers
Resolves #834 To solve the zombie fencing problem there is a producer for each group/topic/partition. Close these producers when a partition is revoked or the container stopped. **cherry-pick to 2.1.x, 2.0.x** I will backport to 1.3.x (without lambdas etc) after review/merge. * checkstyle # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
1 parent 4037ca5 commit bbd2b99

File tree

4 files changed

+62
-10
lines changed

4 files changed

+62
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
8484

8585
private final BlockingQueue<CloseSafeProducer<K, V>> cache = new LinkedBlockingQueue<>();
8686

87-
private final Map<String, Producer<K, V>> consumerProducers = new HashMap<>();
87+
private final Map<String, CloseSafeProducer<K, V>> consumerProducers = new HashMap<>();
8888

8989
private volatile CloseSafeProducer<K, V> producer;
9090

@@ -189,8 +189,7 @@ public void destroy() throws Exception { //NOSONAR
189189
}
190190
synchronized (this.consumerProducers) {
191191
this.consumerProducers.forEach(
192-
(k, v) -> ((CloseSafeProducer<K, V>) v).delegate
193-
.close(this.physicalCloseTimeout, TimeUnit.SECONDS));
192+
(k, v) -> v.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS));
194193
this.consumerProducers.clear();
195194
}
196195
}
@@ -255,7 +254,7 @@ private Producer<K, V> createTransactionalProducerForPartition() {
255254
else {
256255
synchronized (this.consumerProducers) {
257256
if (!this.consumerProducers.containsKey(suffix)) {
258-
Producer<K, V> newProducer = doCreateTxProducer(suffix, this::removeConsumerProducer);
257+
CloseSafeProducer<K, V> newProducer = doCreateTxProducer(suffix, this::removeConsumerProducer);
259258
this.consumerProducers.put(suffix, newProducer);
260259
return newProducer;
261260
}
@@ -268,7 +267,7 @@ private Producer<K, V> createTransactionalProducerForPartition() {
268267

269268
private void removeConsumerProducer(CloseSafeProducer<K, V> producer) {
270269
synchronized (this.consumerProducers) {
271-
Iterator<Entry<String, Producer<K, V>>> iterator = this.consumerProducers.entrySet().iterator();
270+
Iterator<Entry<String, CloseSafeProducer<K, V>>> iterator = this.consumerProducers.entrySet().iterator();
272271
while (iterator.hasNext()) {
273272
if (iterator.next().getValue().equals(producer)) {
274273
iterator.remove();
@@ -294,7 +293,7 @@ protected Producer<K, V> createTransactionalProducer() {
294293
}
295294
}
296295

297-
private Producer<K, V> doCreateTxProducer(String suffix, Consumer<CloseSafeProducer<K, V>> remover) {
296+
private CloseSafeProducer<K, V> doCreateTxProducer(String suffix, Consumer<CloseSafeProducer<K, V>> remover) {
298297
Producer<K, V> producer;
299298
Map<String, Object> configs = new HashMap<>(this.configs);
300299
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
@@ -307,6 +306,18 @@ protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
307306
return this.cache;
308307
}
309308

309+
@Override
310+
public void closeProducerFor(String transactionIdSuffix) {
311+
if (this.producerPerConsumerPartition) {
312+
synchronized (this.consumerProducers) {
313+
CloseSafeProducer<K, V> removed = this.consumerProducers.remove(transactionIdSuffix);
314+
if (removed != null) {
315+
removed.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS);
316+
}
317+
}
318+
}
319+
}
320+
310321
/**
311322
* A wrapper class for the delegate.
312323
*

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,27 @@
2828
*/
2929
public interface ProducerFactory<K, V> {
3030

31+
/**
32+
* Create a producer.
33+
* @return the producer.
34+
*/
3135
Producer<K, V> createProducer();
3236

37+
/**
38+
* Return true if the factory supports transactions.
39+
* @return true if transactional.
40+
*/
3341
default boolean transactionCapable() {
3442
return false;
3543
}
3644

45+
/**
46+
* Remove the specified producer from the cache and close it.
47+
* @param transactionIdSuffix the producer's transaction id suffix.
48+
* @since 1.3.8
49+
*/
50+
default void closeProducerFor(String transactionIdSuffix) {
51+
// NOSONAR
52+
}
53+
3754
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,6 +51,7 @@
5151
import org.springframework.kafka.KafkaException;
5252
import org.springframework.kafka.core.ConsumerFactory;
5353
import org.springframework.kafka.core.KafkaResourceHolder;
54+
import org.springframework.kafka.core.ProducerFactory;
5455
import org.springframework.kafka.core.ProducerFactoryUtils;
5556
import org.springframework.kafka.event.ListenerContainerIdleEvent;
5657
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
@@ -480,6 +481,9 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
480481
if (this.consumerAwareListener != null) {
481482
this.consumerAwareListener.onPartitionsRevokedAfterCommit(consumer, partitions);
482483
}
484+
if (ListenerConsumer.this.kafkaTxManager != null) {
485+
closeProducers(partitions);
486+
}
483487
}
484488

485489
@Override
@@ -677,6 +681,9 @@ public void run() {
677681
// No-op. Continue process
678682
}
679683
}
684+
else {
685+
closeProducers(getAssignedPartitions());
686+
}
680687
}
681688
else {
682689
ListenerConsumer.this.logger.error("No offset and no reset policy; stopping container");
@@ -904,8 +911,8 @@ private void innvokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
904911
this.logger.trace("Processing " + record);
905912
}
906913
try {
907-
TransactionSupport.setTransactionIdSuffix(
908-
this.consumerGroupId + "." + record.topic() + "." + record.partition());
914+
TransactionSupport
915+
.setTransactionIdSuffix(zombieFenceTxIdSuffix(record.topic(), record.partition()));
909916
this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {
910917

911918
@Override
@@ -1251,6 +1258,23 @@ public void seekToEnd(String topic, int partition) {
12511258
this.seeks.add(new TopicPartitionInitialOffset(topic, partition, SeekPosition.END));
12521259
}
12531260

1261+
private void closeProducers(Collection<TopicPartition> partitions) {
1262+
ProducerFactory<?, ?> producerFactory = this.kafkaTxManager.getProducerFactory();
1263+
partitions.forEach(tp -> {
1264+
try {
1265+
producerFactory.closeProducerFor(zombieFenceTxIdSuffix(tp.topic(), tp.partition()));
1266+
}
1267+
catch (Exception e) {
1268+
this.logger.error("Failed to close producer with transaction id suffix: "
1269+
+ zombieFenceTxIdSuffix(tp.topic(), tp.partition()), e);
1270+
}
1271+
});
1272+
}
1273+
1274+
private String zombieFenceTxIdSuffix(String topic, int partition) {
1275+
return this.consumerGroupId + "." + topic + "." + partition;
1276+
}
1277+
12541278
private final class ConsumerAcknowledgment implements Acknowledgment {
12551279

12561280
private final ConsumerRecord<K, V> record;

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,9 +443,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
443443
assertThat(records.count()).isEqualTo(0);
444444
assertThat(consumer.position(new TopicPartition(topic1, 0))).isEqualTo(1);
445445
assertThat(transactionalId.get()).startsWith("rr.group.txTopic");
446+
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).isEmpty();
446447
logger.info("Stop testRollbackRecord");
447448
pf.destroy();
448-
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).isEmpty();
449449
consumer.close();
450450
}
451451

0 commit comments

Comments
 (0)