Skip to content

Commit 2b78b42

Browse files
garyrussellartembilan
authored andcommitted
GH-800: Fix Zombie Fencing
Resolves #800 Fix assignment of `transactional.id` to be consistent across consumers. **cherry-pick to all versions >= 1.3.x**
1 parent 76f7ed7 commit 2b78b42

File tree

6 files changed

+155
-9
lines changed

6 files changed

+155
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 83 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@
1818

1919
import java.util.Collections;
2020
import java.util.HashMap;
21+
import java.util.Iterator;
2122
import java.util.List;
2223
import java.util.Map;
24+
import java.util.Map.Entry;
2325
import java.util.concurrent.BlockingQueue;
2426
import java.util.concurrent.Future;
2527
import java.util.concurrent.LinkedBlockingQueue;
2628
import java.util.concurrent.TimeUnit;
2729
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.function.Consumer;
2831

2932
import org.apache.commons.logging.Log;
3033
import org.apache.commons.logging.LogFactory;
@@ -48,6 +51,7 @@
4851
import org.springframework.context.ApplicationContextAware;
4952
import org.springframework.context.ApplicationListener;
5053
import org.springframework.context.event.ContextStoppedEvent;
54+
import org.springframework.kafka.support.TransactionSupport;
5155
import org.springframework.lang.Nullable;
5256
import org.springframework.util.Assert;
5357

@@ -91,6 +95,8 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
9195

9296
private final BlockingQueue<CloseSafeProducer<K, V>> cache = new LinkedBlockingQueue<>();
9397

98+
private final Map<String, Producer<K, V>> consumerProducers = new HashMap<>();
99+
94100
private volatile CloseSafeProducer<K, V> producer;
95101

96102
private Serializer<K> keySerializer;
@@ -103,6 +109,8 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
103109

104110
private ApplicationContext applicationContext;
105111

112+
private boolean producerPerConsumerPartition = true;
113+
106114
/**
107115
* Construct a factory with the provided configuration.
108116
* @param configs the configuration.
@@ -170,6 +178,17 @@ private void enableIdempotentBehaviour() {
170178
}
171179
}
172180

181+
/**
182+
* Set to false to revert to the previous behavior of a simple incrementing
183+
* trasactional.id suffix for each producer instead of maintaining a producer
184+
* for each group/topic/partition.
185+
* @param producerPerConsumerPartition false to revert.
186+
* @since 1.3.7
187+
*/
188+
public void setProducerPerConsumerPartition(boolean producerPerConsumerPartition) {
189+
this.producerPerConsumerPartition = producerPerConsumerPartition;
190+
}
191+
173192
/**
174193
* Return an unmodifiable reference to the configuration map for this factory.
175194
* Useful for cloning to make a similar factory.
@@ -203,6 +222,11 @@ public void destroy() throws Exception { //NOSONAR
203222
}
204223
producer = this.cache.poll();
205224
}
225+
synchronized (this.consumerProducers) {
226+
this.consumerProducers.forEach(
227+
(k, v) -> ((CloseSafeProducer<K, V>) v).delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS));
228+
this.consumerProducers.clear();
229+
}
206230
}
207231

208232
@Override
@@ -258,7 +282,12 @@ public boolean isRunning() {
258282
@Override
259283
public Producer<K, V> createProducer() {
260284
if (this.transactionIdPrefix != null) {
261-
return createTransactionalProducer();
285+
if (this.producerPerConsumerPartition) {
286+
return createTransactionalProducerForPartition();
287+
}
288+
else {
289+
return createTransactionalProducer();
290+
}
262291
}
263292
if (this.producer == null) {
264293
synchronized (this) {
@@ -279,6 +308,37 @@ protected Producer<K, V> createKafkaProducer() {
279308
return new KafkaProducer<K, V>(this.configs, this.keySerializer, this.valueSerializer);
280309
}
281310

311+
private Producer<K, V> createTransactionalProducerForPartition() {
312+
String suffix = TransactionSupport.getTransactionIdSuffix();
313+
if (suffix == null) {
314+
return createTransactionalProducer();
315+
}
316+
else {
317+
synchronized (this.consumerProducers) {
318+
if (!this.consumerProducers.containsKey(suffix)) {
319+
Producer<K, V> newProducer = doCreateTxProducer(suffix, this::removeConsumerProducer);
320+
this.consumerProducers.put(suffix, newProducer);
321+
return newProducer;
322+
}
323+
else {
324+
return this.consumerProducers.get(suffix);
325+
}
326+
}
327+
}
328+
}
329+
330+
private void removeConsumerProducer(CloseSafeProducer<K, V> producer) {
331+
synchronized (this.consumerProducers) {
332+
Iterator<Entry<String, Producer<K, V>>> iterator = this.consumerProducers.entrySet().iterator();
333+
while (iterator.hasNext()) {
334+
if (iterator.next().getValue().equals(producer)) {
335+
iterator.remove();
336+
break;
337+
}
338+
}
339+
}
340+
}
341+
282342
/**
283343
* Subclasses must return a producer from the {@link #getCache()} or a
284344
* new raw producer wrapped in a {@link CloseSafeProducer}.
@@ -288,18 +348,22 @@ protected Producer<K, V> createKafkaProducer() {
288348
protected Producer<K, V> createTransactionalProducer() {
289349
Producer<K, V> producer = this.cache.poll();
290350
if (producer == null) {
291-
Map<String, Object> configs = new HashMap<>(this.configs);
292-
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
293-
this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
294-
producer = new KafkaProducer<K, V>(configs, this.keySerializer, this.valueSerializer);
295-
producer.initTransactions();
296-
return new CloseSafeProducer<K, V>(producer, this.cache);
351+
return doCreateTxProducer("" + this.transactionIdSuffix.getAndIncrement(), null);
297352
}
298353
else {
299354
return producer;
300355
}
301356
}
302357

358+
private Producer<K, V> doCreateTxProducer(String suffix, Consumer<CloseSafeProducer<K, V>> remover) {
359+
Producer<K, V> producer;
360+
Map<String, Object> configs = new HashMap<>(this.configs);
361+
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, this.transactionIdPrefix + suffix);
362+
producer = new KafkaProducer<K, V>(configs, this.keySerializer, this.valueSerializer);
363+
producer.initTransactions();
364+
return new CloseSafeProducer<K, V>(producer, this.cache, remover);
365+
}
366+
303367
protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
304368
return this.cache;
305369
}
@@ -317,16 +381,24 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
317381

318382
private final BlockingQueue<CloseSafeProducer<K, V>> cache;
319383

384+
private final Consumer<CloseSafeProducer<K, V>> removeConsumerProducer;
385+
320386
private volatile boolean txFailed;
321387

322388
CloseSafeProducer(Producer<K, V> delegate) {
323-
this(delegate, null);
389+
this(delegate, null, null);
324390
Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");
325391
}
326392

327393
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache) {
394+
this(delegate, cache, null);
395+
}
396+
397+
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
398+
Consumer<CloseSafeProducer<K, V>> removeConsumerProducer) {
328399
this.delegate = delegate;
329400
this.cache = cache;
401+
this.removeConsumerProducer = removeConsumerProducer;
330402
}
331403

332404
@Override
@@ -406,6 +478,9 @@ public void close() {
406478
+ "broker restarted during transaction");
407479

408480
this.delegate.close();
481+
if (this.removeConsumerProducer != null) {
482+
this.removeConsumerProducer.accept(this);
483+
}
409484
}
410485
else {
411486
synchronized (this) {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.springframework.kafka.support.LogIfLevelEnabled;
6868
import org.springframework.kafka.support.TopicPartitionInitialOffset;
6969
import org.springframework.kafka.support.TopicPartitionInitialOffset.SeekPosition;
70+
import org.springframework.kafka.support.TransactionSupport;
7071
import org.springframework.kafka.support.serializer.DeserializationException;
7172
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
7273
import org.springframework.scheduling.SchedulingAwareRunnable;
@@ -1057,6 +1058,8 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
10571058
this.logger.trace("Processing " + record);
10581059
}
10591060
try {
1061+
TransactionSupport.setTransactionIdSuffix(
1062+
this.consumerGroupId + "." + record.topic() + "." + record.partition());
10601063
this.transactionTemplate.execute(new TransactionCallbackWithoutResult() {
10611064

10621065
@Override
@@ -1083,6 +1086,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
10831086
}
10841087
getAfterRollbackProcessor().process(unprocessed, this.consumer, e, true);
10851088
}
1089+
finally {
1090+
TransactionSupport.clearTransactionIdSuffix();
1091+
}
10861092
}
10871093
}
10881094

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support;
18+
19+
/**
20+
* Utilities for supporting transactions.
21+
*
22+
* @author Gary Russell
23+
* @since 1.3.7
24+
*
25+
*/
26+
public final class TransactionSupport {
27+
28+
private static final ThreadLocal<String> transactionIdSuffix = new ThreadLocal<>();
29+
30+
private TransactionSupport() {
31+
super();
32+
}
33+
34+
public static void setTransactionIdSuffix(String suffix) {
35+
transactionIdSuffix.set(suffix);
36+
}
37+
38+
public static String getTransactionIdSuffix() {
39+
return transactionIdSuffix.get();
40+
}
41+
42+
public static void clearTransactionIdSuffix() {
43+
transactionIdSuffix.remove();
44+
}
45+
46+
}

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
6969
import org.springframework.kafka.core.KafkaTemplate;
7070
import org.springframework.kafka.core.ProducerFactory;
71+
import org.springframework.kafka.core.ProducerFactoryUtils;
7172
import org.springframework.kafka.event.ConsumerStoppedEvent;
7273
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;
7374
import org.springframework.kafka.support.KafkaHeaders;
@@ -393,6 +394,7 @@ public void testConsumeAndProduceTransactionExternalTM() throws Exception {
393394
verify(pf).createProducer();
394395
}
395396

397+
@SuppressWarnings("unchecked")
396398
@Test
397399
public void testRollbackRecord() throws Exception {
398400
logger.info("Start testRollbackRecord");
@@ -413,6 +415,7 @@ public void testRollbackRecord() throws Exception {
413415
final KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
414416
final AtomicBoolean failed = new AtomicBoolean();
415417
final CountDownLatch latch = new CountDownLatch(3);
418+
final AtomicReference<String> transactionalId = new AtomicReference<>();
416419
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
417420
latch.countDown();
418421
if (failed.compareAndSet(false, true)) {
@@ -424,6 +427,9 @@ public void testRollbackRecord() throws Exception {
424427
if (message.topic().equals(topic1)) {
425428
template.send(topic2, "bar");
426429
template.flush();
430+
transactionalId.set(KafkaTestUtils.getPropertyValue(
431+
ProducerFactoryUtils.getTransactionalResourceHolder(pf).getProducer(),
432+
"delegate.transactionManager.transactionalId", String.class));
427433
}
428434
});
429435

@@ -466,8 +472,10 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
466472
assertThat(records.count()).isEqualTo(0);
467473
// depending on timing, the position might include the offset representing the commit in the log
468474
assertThat(consumer.position(new TopicPartition(topic1, 0))).isGreaterThanOrEqualTo(1L);
475+
assertThat(transactionalId.get()).startsWith("rr.group.txTopic");
469476
logger.info("Stop testRollbackRecord");
470477
pf.destroy();
478+
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).isEmpty();
471479
consumer.close();
472480
}
473481

src/reference/asciidoc/kafka.adoc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,12 @@ Spring for Apache Kafka adds support in several ways.
256256
Transactions are enabled by providing the `DefaultKafkaProducerFactory` with a `transactionIdPrefix`.
257257
In that case, instead of managing a single shared `Producer`, the factory maintains a cache of transactional producers.
258258
When the user `close()` s a producer, it is returned to the cache for reuse instead of actually being closed.
259-
The `transactional.id` property of each producer is `transactionIdPrefix` + `n`, where `n` starts with `0` and is incremented for each new producer.
259+
The `transactional.id` property of each producer is `transactionIdPrefix` + `n`, where `n` starts with `0` and is incremented for each new producer, unless the transaction is started by a listener container with a record-based listener.
260+
In that case, the `transactional.id` is `<transactionIdPrefix>.<group.id>.<topic>.<partition>`; this is to properly support fencing zombies https://www.confluent.io/blog/transactions-apache-kafka/[as described here].
261+
This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and 2.2.0.
262+
If you wish to revert to the previous behavior, set the `producerPerConsumerPartition` property on the `DefaultKafkaProducerFactory` to `false`.
263+
264+
NOTE: While transactions are supported with batch listeners, zombie fencing cannot be supported because a batch may contain records from multiple topics/partitions.
260265

261266
====== KafkaTransactionManager
262267

src/reference/asciidoc/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,9 @@ See <<serdes>> for more information.
7979
The streams configuration bean must now be a simple `Properties` object instead of a `StreamsConfig`.
8080

8181
See <<kafka-streams>> for more information.
82+
83+
84+
==== Transactional Id
85+
86+
When a transaction is started by the listener container, the `transactional.id` is now the `transactionIdPrefix` appended with `<group.id>.<topic>.<partition>`.
87+
This is to allow proper fencing of zombies https://www.confluent.io/blog/transactions-apache-kafka/[as described here].

0 commit comments

Comments
 (0)