Skip to content

Commit c783ad6

Browse files
varmenisesobychacko
authored andcommitted
GH-3402: Fix KafkaAdmin clusterId config with observability enabled
Fixes: #3402 Re-set clusterId after creating new KafkaAdmin to ensure proper configuration when observability is enabled and bootstrap supplier is not set. This addresses the issue where kafkaAdmin clusterId configuration was being ignored under specific conditions. **Auto-cherry-pick to `3.2.x` & `3.1.x`**
1 parent 7e8d325 commit c783ad6

File tree

2 files changed

+33
-1
lines changed

2 files changed

+33
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@
7171
*
7272
* @author Gary Russell
7373
* @author Artem Bilan
74+
* @author Adrian Gygax
75+
* @author Sanghyeok An
76+
* @author Valentina Armenise
7477
*
7578
* @since 1.3
7679
*/
@@ -208,6 +211,15 @@ public void setClusterId(String clusterId) {
208211
this.clusterId = clusterId;
209212
}
210213

214+
/**
215+
* Get the clusterId property.
216+
* @return the cluster id.
217+
* @since 3.1.8
218+
*/
219+
public String getClusterId() {
220+
return this.clusterId;
221+
}
222+
211223
@Override
212224
public Map<String, Object> getConfigurationProperties() {
213225
Map<String, Object> configs2 = new HashMap<>(this.configs);

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@
102102
* @author Thomas Strauß
103103
* @author Soby Chacko
104104
* @author Gurps Bassi
105+
* @author Valentina Armenise
105106
*/
106107
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
107108
ApplicationListener<ContextStoppedEvent>, DisposableBean, SmartInitializingSingleton {
@@ -486,13 +487,17 @@ public void afterSingletonsInstantiated() {
486487
if (this.kafkaAdmin != null) {
487488
Object producerServers = this.producerFactory.getConfigurationProperties()
488489
.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
489-
String adminServers = this.kafkaAdmin.getBootstrapServers();
490+
String adminServers = getAdminBootstrapAddress();
490491
if (!producerServers.equals(adminServers)) {
491492
Map<String, Object> props = new HashMap<>(this.kafkaAdmin.getConfigurationProperties());
492493
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerServers);
493494
int opTo = this.kafkaAdmin.getOperationTimeout();
495+
String clusterId = this.kafkaAdmin.getClusterId();
494496
this.kafkaAdmin = new KafkaAdmin(props);
495497
this.kafkaAdmin.setOperationTimeout(opTo);
498+
if (clusterId != null && !clusterId.isEmpty()) {
499+
this.kafkaAdmin.setClusterId(clusterId);
500+
}
496501
}
497502
}
498503
}
@@ -502,6 +507,21 @@ else if (this.micrometerEnabled) {
502507
}
503508
}
504509

510+
private String getAdminBootstrapAddress() {
511+
// Retrieve bootstrap servers from KafkaAdmin bootstrap supplier if available
512+
String adminServers = this.kafkaAdmin.getBootstrapServers();
513+
514+
// Fallback to configuration properties if bootstrap servers are not set
515+
if (adminServers == null) {
516+
adminServers = this.kafkaAdmin.getConfigurationProperties().getOrDefault(
517+
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
518+
""
519+
).toString();
520+
}
521+
522+
return adminServers;
523+
}
524+
505525
@Nullable
506526
private String clusterId() {
507527
if (this.kafkaAdmin != null && this.clusterId == null) {

0 commit comments

Comments
 (0)