Skip to content

Commit 3cd6c9c

Browse files
authored
Allow configuring observation registry directly
This changes allows configuring the observation registry directly, instead of it being fetched from the application context. This is to allow observability when `KafkaTemplate/KafkaMessageListenerContainer` are used without an application context.
1 parent 019deb7 commit 3cd6c9c

File tree

3 files changed

+45
-11
lines changed

3 files changed

+45
-11
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@
103103
* @author Soby Chacko
104104
* @author Gurps Bassi
105105
* @author Valentina Armenise
106+
* @author Christian Fredriksson
106107
*/
107108
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
108109
ApplicationListener<ContextStoppedEvent>, DisposableBean, SmartInitializingSingleton {
@@ -456,6 +457,16 @@ public void setObservationConvention(KafkaTemplateObservationConvention observat
456457
this.observationConvention = observationConvention;
457458
}
458459

460+
/**
461+
* Configure the {@link ObservationRegistry} to use for recording observations.
462+
* @param observationRegistry the observation registry to use.
463+
* @since 3.3.1
464+
*/
465+
public void setObservationRegistry(ObservationRegistry observationRegistry) {
466+
Assert.notNull(observationRegistry, "'observationRegistry' must not be null");
467+
this.observationRegistry = observationRegistry;
468+
}
469+
459470
/**
460471
* Return the {@link KafkaAdmin}, used to find the cluster id for observation, if
461472
* present.
@@ -479,8 +490,10 @@ public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
479490
@Override
480491
public void afterSingletonsInstantiated() {
481492
if (this.observationEnabled && this.applicationContext != null) {
482-
this.observationRegistry = this.applicationContext.getBeanProvider(ObservationRegistry.class)
483-
.getIfUnique(() -> this.observationRegistry);
493+
if (this.observationRegistry.isNoop()) {
494+
this.observationRegistry = this.applicationContext.getBeanProvider(ObservationRegistry.class)
495+
.getIfUnique(() -> this.observationRegistry);
496+
}
484497
if (this.kafkaAdmin == null) {
485498
this.kafkaAdmin = this.applicationContext.getBeanProvider(KafkaAdmin.class).getIfUnique();
486499
if (this.kafkaAdmin != null) {

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.function.Function;
2727
import java.util.regex.Pattern;
2828

29+
import io.micrometer.observation.ObservationRegistry;
2930
import org.aopalliance.aop.Advice;
3031
import org.apache.kafka.clients.consumer.ConsumerRecord;
3132

@@ -281,6 +282,8 @@ public enum EOSMode {
281282

282283
private boolean observationEnabled;
283284

285+
private ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
286+
284287
private Duration consumerStartTimeout = DEFAULT_CONSUMER_START_TIMEOUT;
285288

286289
private Boolean subBatchPerPartition;
@@ -716,6 +719,20 @@ public void setObservationEnabled(boolean observationEnabled) {
716719
this.observationEnabled = observationEnabled;
717720
}
718721

722+
public ObservationRegistry getObservationRegistry() {
723+
return this.observationRegistry;
724+
}
725+
726+
/**
727+
* Configure the {@link ObservationRegistry} to use for recording observations.
728+
* @param observationRegistry the observation registry to use.
729+
* @since 3.3.1
730+
*/
731+
public void setObservationRegistry(ObservationRegistry observationRegistry) {
732+
Assert.notNull(observationRegistry, "'observationRegistry' must not be null");
733+
this.observationRegistry = observationRegistry;
734+
}
735+
719736
/**
720737
* Set additional tags for the Micrometer listener timers.
721738
* @param tags the tags.
@@ -1118,6 +1135,9 @@ public String toString() {
11181135
+ (this.observationConvention != null
11191136
? "\n observationConvention=" + this.observationConvention
11201137
: "")
1138+
+ (this.observationRegistry != null
1139+
? "\n observationRegistry=" + this.observationRegistry
1140+
: "")
11211141
+ "\n restartAfterAuthExceptions=" + this.restartAfterAuthExceptions
11221142
+ "\n]";
11231143
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@
7878

7979
import org.springframework.aop.support.AopUtils;
8080
import org.springframework.beans.BeanUtils;
81-
import org.springframework.beans.factory.ObjectProvider;
8281
import org.springframework.context.ApplicationContext;
8382
import org.springframework.context.ApplicationEventPublisher;
8483
import org.springframework.core.log.LogAccessor;
@@ -171,6 +170,7 @@
171170
* @author Borahm Lee
172171
* @author Lokesh Alamuri
173172
* @author Sanghyeok An
173+
* @author Christian Fredriksson
174174
*/
175175
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
176176
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
@@ -372,14 +372,15 @@ protected void doStart() {
372372
}
373373
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
374374
ListenerType listenerType = determineListenerType(listener);
375-
ObservationRegistry observationRegistry = ObservationRegistry.NOOP;
376-
ApplicationContext applicationContext = getApplicationContext();
377-
if (applicationContext != null && containerProperties.isObservationEnabled()) {
378-
ObjectProvider<ObservationRegistry> registry =
379-
applicationContext.getBeanProvider(ObservationRegistry.class);
380-
ObservationRegistry reg = registry.getIfUnique();
381-
if (reg != null) {
382-
observationRegistry = reg;
375+
ObservationRegistry observationRegistry = containerProperties.getObservationRegistry();
376+
if (observationRegistry.isNoop()) {
377+
ApplicationContext applicationContext = getApplicationContext();
378+
if (applicationContext != null && containerProperties.isObservationEnabled()) {
379+
ObservationRegistry reg = applicationContext.getBeanProvider(ObservationRegistry.class)
380+
.getIfUnique();
381+
if (reg != null) {
382+
observationRegistry = reg;
383+
}
383384
}
384385
}
385386
this.listenerConsumer = new ListenerConsumer(listener, listenerType, observationRegistry);

0 commit comments

Comments
 (0)