Skip to content

Commit e307771

Browse files
committed
Don't cache tx producers after reset()
Producers were incorrectly returned to the cache after a `reset()`. **I will back-port; conflicts expected**
1 parent 2b02987 commit e307771

File tree

2 files changed

+66
-16
lines changed

2 files changed

+66
-16
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,6 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
127127

128128
private final ThreadLocal<CloseSafeProducer<K, V>> threadBoundProducers = new ThreadLocal<>();
129129

130-
private final ThreadLocal<Integer> threadBoundProducerEpochs = new ThreadLocal<>();
131-
132130
private final AtomicInteger epoch = new AtomicInteger();
133131

134132
private final AtomicInteger clientIdCounter = new AtomicInteger();
@@ -402,25 +400,21 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
402400
}
403401
if (this.producerPerThread) {
404402
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
405-
if (this.threadBoundProducerEpochs.get() == null) {
406-
this.threadBoundProducerEpochs.set(this.epoch.get());
407-
}
408-
if (tlProducer != null && this.epoch.get() != this.threadBoundProducerEpochs.get()) {
403+
if (tlProducer != null && this.epoch.get() != tlProducer.epoch) {
409404
closeThreadBoundProducer();
410405
tlProducer = null;
411406
}
412407
if (tlProducer == null) {
413408
tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
414-
this.physicalCloseTimeout);
409+
this.physicalCloseTimeout, this.epoch);
415410
this.threadBoundProducers.set(tlProducer);
416-
this.threadBoundProducerEpochs.set(this.epoch.get());
417411
}
418412
return tlProducer;
419413
}
420414
synchronized (this) {
421415
if (this.producer == null) {
422416
this.producer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
423-
this.physicalCloseTimeout);
417+
this.physicalCloseTimeout, this.epoch);
424418
}
425419
return this.producer;
426420
}
@@ -527,7 +521,8 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
527521
newProducer = createRawProducer(newProducerConfigs);
528522
newProducer.initTransactions();
529523
return new CloseSafeProducer<>(newProducer, getCache(prefix), remover,
530-
(String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), this.physicalCloseTimeout);
524+
(String) newProducerConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG), this.physicalCloseTimeout,
525+
this.epoch);
531526
}
532527

533528
protected Producer<K, V> createRawProducer(Map<String, Object> configs) {
@@ -596,37 +591,57 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
596591

597592
private final Duration closeTimeout;
598593

594+
final int epoch; // NOSONAR
595+
596+
private final AtomicInteger factoryEpoch;
597+
599598
private volatile Exception producerFailed;
600599

601600
private volatile boolean closed;
602601

603602
CloseSafeProducer(Producer<K, V> delegate, Consumer<CloseSafeProducer<K, V>> removeProducer,
604603
Duration closeTimeout) {
605604

606-
this(delegate, null, removeProducer, null, closeTimeout);
605+
this(delegate, null, removeProducer, null, closeTimeout, new AtomicInteger());
606+
Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");
607+
}
608+
609+
CloseSafeProducer(Producer<K, V> delegate, Consumer<CloseSafeProducer<K, V>> removeProducer,
610+
Duration closeTimeout, AtomicInteger epoch) {
611+
612+
this(delegate, null, removeProducer, null, closeTimeout, epoch);
607613
Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");
608614
}
609615

610616
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
611617
Duration closeTimeout) {
612-
this(delegate, cache, null, closeTimeout);
618+
this(delegate, cache, null, null, closeTimeout, new AtomicInteger());
613619
}
614620

615621
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
616622
@Nullable Consumer<CloseSafeProducer<K, V>> removeConsumerProducer, Duration closeTimeout) {
617623

618-
this(delegate, cache, removeConsumerProducer, null, closeTimeout);
624+
this(delegate, cache, removeConsumerProducer, null, closeTimeout, new AtomicInteger());
619625
}
620626

621627
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
622628
@Nullable Consumer<CloseSafeProducer<K, V>> removeProducer, @Nullable String txId,
623629
Duration closeTimeout) {
624630

631+
this(delegate, cache, removeProducer, txId, closeTimeout, new AtomicInteger());
632+
}
633+
634+
CloseSafeProducer(Producer<K, V> delegate, BlockingQueue<CloseSafeProducer<K, V>> cache,
635+
@Nullable Consumer<CloseSafeProducer<K, V>> removeProducer, @Nullable String txId,
636+
Duration closeTimeout, AtomicInteger epoch) {
637+
625638
this.delegate = delegate;
626639
this.cache = cache;
627640
this.removeProducer = removeProducer;
628641
this.txId = txId;
629642
this.closeTimeout = closeTimeout;
643+
this.epoch = epoch.get();
644+
this.factoryEpoch = epoch;
630645
LOGGER.debug(() -> "Created new Producer: " + this);
631646
}
632647

@@ -760,8 +775,8 @@ public void close(@Nullable Duration timeout) {
760775
else {
761776
if (this.cache != null && this.removeProducer == null) { // dedicated consumer producers are not cached
762777
synchronized (this) {
763-
if (!this.cache.contains(this)
764-
&& !this.cache.offer(this)) {
778+
if (this.epoch != this.factoryEpoch.get()
779+
|| (!this.cache.contains(this) && !this.cache.offer(this))) {
765780
this.closed = true;
766781
this.delegate.close(timeout);
767782
}

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,42 @@ protected Producer createTransactionalProducer(String txIdPrefix) {
174174

175175
@Test
176176
@SuppressWarnings({ "rawtypes", "unchecked" })
177-
void testThreadLocal() {
177+
void dontReturnToCacheAfterReset() {
178+
final Producer producer = mock(Producer.class);
179+
ApplicationContext ctx = mock(ApplicationContext.class);
180+
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
181+
182+
@Override
183+
protected Producer createRawProducer(Map configs) {
184+
return producer;
185+
}
186+
187+
};
188+
pf.setApplicationContext(ctx);
189+
pf.setTransactionIdPrefix("foo");
190+
Producer aProducer = pf.createProducer();
191+
assertThat(aProducer).isNotNull();
192+
aProducer.close();
193+
Producer bProducer = pf.createProducer();
194+
assertThat(bProducer).isSameAs(aProducer);
195+
bProducer.close();
196+
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
197+
Map<?, ?> cache = KafkaTestUtils.getPropertyValue(pf, "cache", Map.class);
198+
assertThat(cache.size()).isEqualTo(1);
199+
Queue queue = (Queue) cache.get("foo");
200+
assertThat(queue.size()).isEqualTo(1);
201+
bProducer = pf.createProducer();
202+
assertThat(bProducer).isSameAs(aProducer);
203+
assertThat(queue.size()).isEqualTo(0);
204+
pf.reset();
205+
bProducer.close();
206+
assertThat(queue.size()).isEqualTo(0);
207+
pf.destroy();
208+
}
209+
210+
@Test
211+
@SuppressWarnings({ "rawtypes", "unchecked" })
212+
void testThreadLocal() throws InterruptedException {
178213
final Producer producer = mock(Producer.class);
179214
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
180215

0 commit comments

Comments
 (0)