Skip to content

Commit bd57099

Browse files
garyrussellartembilan
authored andcommitted
GH-2744: Fix Possible Deadlock in DKPF
Resolves #2744 Possible deadlock if `removeProducer` is called on the producer network thread. Move resetting the global shared producer to the creation logic. Also ensure the delegate of any thread-bound producers are closed. Add try/catch around the delegate close. **cherry-pick to 2.9.x** # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java
1 parent 8404ef5 commit bd57099

File tree

2 files changed

+19
-19
lines changed

2 files changed

+19
-19
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -751,6 +751,10 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
751751
return getOrCreateThreadBoundProducer();
752752
}
753753
synchronized (this) {
754+
if (this.producer != null && this.producer.closed) {
755+
this.producer.closeDelegate(this.physicalCloseTimeout, this.listeners);
756+
this.producer = null;
757+
}
754758
if (this.producer != null && expire(this.producer)) {
755759
this.producer = null;
756760
}
@@ -765,7 +769,7 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
765769

766770
private Producer<K, V> getOrCreateThreadBoundProducer() {
767771
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.get();
768-
if (tlProducer != null && (this.epoch.get() != tlProducer.epoch || expire(tlProducer))) {
772+
if (tlProducer != null && (tlProducer.closed || this.epoch.get() != tlProducer.epoch || expire(tlProducer))) {
769773
closeThreadBoundProducer();
770774
tlProducer = null;
771775
}
@@ -834,21 +838,11 @@ private boolean removeConsumerProducer(CloseSafeProducer<K, V> producerToRemove,
834838
* Remove the single shared producer and a thread-bound instance if present.
835839
* @param producerToRemove the producer.
836840
* @param timeout the close timeout.
837-
* @return always true.
841+
* @return true if the producer was closed.
838842
* @since 2.2.13
839843
*/
840-
protected final synchronized boolean removeProducer(CloseSafeProducer<K, V> producerToRemove, Duration timeout) {
841-
if (producerToRemove.closed) {
842-
if (producerToRemove.equals(this.producer)) {
843-
this.producer = null;
844-
producerToRemove.closeDelegate(timeout, this.listeners);
845-
}
846-
this.threadBoundProducers.remove();
847-
return true;
848-
}
849-
else {
850-
return false;
851-
}
844+
protected final boolean removeProducer(CloseSafeProducer<K, V> producerToRemove, Duration timeout) {
845+
return producerToRemove.closed;
852846
}
853847

854848
/**
@@ -1204,7 +1198,12 @@ public void close(@Nullable Duration timeout) {
12041198
}
12051199

12061200
void closeDelegate(Duration timeout, List<Listener<K, V>> listeners) {
1207-
this.delegate.close(timeout == null ? this.closeTimeout : timeout);
1201+
try {
1202+
this.delegate.close(timeout == null ? this.closeTimeout : timeout);
1203+
}
1204+
catch (Exception ex) {
1205+
LOGGER.warn(ex, () -> "Failed to close " + this.delegate);
1206+
}
12081207
listeners.forEach(listener -> listener.producerRemoved(this.clientId, this));
12091208
this.closed = true;
12101209
}

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -331,11 +331,12 @@ protected Producer createKafkaProducer() {
331331
};
332332
final Producer aProducer = pf.createProducer();
333333
assertThat(aProducer).isNotNull();
334+
Producer bProducer = pf.createProducer();
335+
assertThat(bProducer).isSameAs(aProducer);
334336
aProducer.send(null, (meta, ex) -> { });
335337
aProducer.close(ProducerFactoryUtils.DEFAULT_CLOSE_TIMEOUT);
336-
assertThat(KafkaTestUtils.getPropertyValue(pf, "producer")).isNull();
338+
bProducer = pf.createProducer();
337339
verify(producer1).close(any(Duration.class));
338-
Producer bProducer = pf.createProducer();
339340
assertThat(bProducer).isNotSameAs(aProducer);
340341
}
341342

0 commit comments

Comments
 (0)