Skip to content

Commit 9c029f7

Browse files
garyrussellartembilan
authored andcommitted
Close thread bound producers after reset
**cherry-pick to 2.4.x, 2.3.x**
1 parent 0372214 commit 9c029f7

File tree

2 files changed

+42
-0
lines changed

2 files changed

+42
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>,
126126

127127
private final ThreadLocal<CloseSafeProducer<K, V>> threadBoundProducers = new ThreadLocal<>();
128128

129+
private final ThreadLocal<Integer> threadBoundProducerEpochs = new ThreadLocal<>();
130+
131+
private final AtomicInteger epoch = new AtomicInteger();
132+
129133
private final AtomicInteger clientIdCounter = new AtomicInteger();
130134

131135
private Supplier<Serializer<K>> keySerializerSupplier;
@@ -332,6 +336,7 @@ public void destroy() {
332336
(k, v) -> v.getDelegate().close(this.physicalCloseTimeout));
333337
this.consumerProducers.clear();
334338
}
339+
this.epoch.incrementAndGet();
335340
}
336341

337342
@Override
@@ -393,10 +398,18 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
393398
}
394399
if (this.producerPerThread) {
395400
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
401+
if (this.threadBoundProducerEpochs.get() == null) {
402+
this.threadBoundProducerEpochs.set(this.epoch.get());
403+
}
404+
if (tlProducer != null && this.epoch.get() != this.threadBoundProducerEpochs.get()) {
405+
closeThreadBoundProducer();
406+
tlProducer = null;
407+
}
396408
if (tlProducer == null) {
397409
tlProducer = new CloseSafeProducer<>(createKafkaProducer(), this::removeProducer,
398410
this.physicalCloseTimeout);
399411
this.threadBoundProducers.set(tlProducer);
412+
this.threadBoundProducerEpochs.set(this.epoch.get());
400413
}
401414
return tlProducer;
402415
}

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,35 @@ protected Producer createKafkaProducer() {
202202
verify(producer).close(any(Duration.class));
203203
}
204204

205+
@Test
206+
@SuppressWarnings({ "rawtypes", "unchecked" })
207+
void testThreadLocalReset() {
208+
Producer producer1 = mock(Producer.class);
209+
Producer producer2 = mock(Producer.class);
210+
ProducerFactory mockPf = mock(ProducerFactory.class);
211+
given(mockPf.createProducer()).willReturn(producer1, producer2);
212+
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory(new HashMap<>()) {
213+
214+
@Override
215+
protected Producer createKafkaProducer() {
216+
return mockPf.createProducer();
217+
}
218+
219+
};
220+
pf.setProducerPerThread(true);
221+
Producer aProducer = pf.createProducer();
222+
assertThat(aProducer).isNotNull();
223+
aProducer.close();
224+
Producer bProducer = pf.createProducer();
225+
assertThat(bProducer).isSameAs(aProducer);
226+
bProducer.close();
227+
pf.reset();
228+
bProducer = pf.createProducer();
229+
assertThat(bProducer).isNotSameAs(aProducer);
230+
bProducer.close();
231+
verify(producer1).close(any(Duration.class));
232+
}
233+
205234
@Test
206235
@SuppressWarnings({ "rawtypes", "unchecked" })
207236
void testCleanUpAfterTxFence() {

0 commit comments

Comments
 (0)