diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetrics.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetrics.java index 27c6024966..a56b98c53d 100644 --- a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetrics.java +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetrics.java @@ -24,6 +24,8 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Metric; +import java.util.concurrent.ScheduledExecutorService; + /** * Kafka Client metrics binder. This should be closed on application shutdown to clean up * resources. @@ -43,6 +45,21 @@ @NonNullFields public class KafkaClientMetrics extends KafkaMetrics { + /** + * Kafka {@link Producer} metrics binder. The lifecycle of the custom scheduler passed + * is the responsibility of the caller. It will not be shut down when this instance is + * {@link #close() closed}. A scheduler can be shared among multiple instances of + * {@link KafkaClientMetrics} to reduce resource usage by reducing the number of + * threads if there will be many instances. + * @param kafkaProducer producer instance to be instrumented + * @param tags additional tags + * @param scheduler custom scheduler to check and bind metrics + * @since 1.14.0 + */ + public KafkaClientMetrics(Producer kafkaProducer, Iterable tags, ScheduledExecutorService scheduler) { + super(kafkaProducer::metrics, tags, scheduler); + } + /** * Kafka {@link Producer} metrics binder * @param kafkaProducer producer instance to be instrumented @@ -60,6 +77,21 @@ public KafkaClientMetrics(Producer kafkaProducer) { super(kafkaProducer::metrics); } + /** + * Kafka {@link Consumer} metrics binder. The lifecycle of the custom scheduler passed + * is the responsibility of the caller. It will not be shut down when this instance is + * {@link #close() closed}. A scheduler can be shared among multiple instances of + * {@link KafkaClientMetrics} to reduce resource usage by reducing the number of + * threads if there will be many instances. + * @param kafkaConsumer consumer instance to be instrumented + * @param tags additional tags + * @param scheduler custom scheduler to check and bind metrics + * @since 1.14.0 + */ + public KafkaClientMetrics(Consumer kafkaConsumer, Iterable tags, ScheduledExecutorService scheduler) { + super(kafkaConsumer::metrics, tags, scheduler); + } + /** * Kafka {@link Consumer} metrics binder * @param kafkaConsumer consumer instance to be instrumented @@ -77,6 +109,21 @@ public KafkaClientMetrics(Consumer kafkaConsumer) { super(kafkaConsumer::metrics); } + /** + * Kafka {@link AdminClient} metrics binder. The lifecycle of the custom scheduler + * passed is the responsibility of the caller. It will not be shut down when this + * instance is {@link #close() closed}. A scheduler can be shared among multiple + * instances of {@link KafkaClientMetrics} to reduce resource usage by reducing the + * number of threads if there will be many instances. + * @param adminClient instance to be instrumented + * @param tags additional tags + * @param scheduler custom scheduler to check and bind metrics + * @since 1.14.0 + */ + public KafkaClientMetrics(AdminClient adminClient, Iterable tags, ScheduledExecutorService scheduler) { + super(adminClient::metrics, tags, scheduler); + } + /** * Kafka {@link AdminClient} metrics binder * @param adminClient instance to be instrumented diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaMetrics.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaMetrics.java index 5e90c7561b..6dc118338a 100644 --- a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaMetrics.java +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaMetrics.java @@ -71,6 +71,8 @@ class KafkaMetrics implements MeterBinder, AutoCloseable { static final String KAFKA_VERSION_TAG_NAME = "kafka.version"; static final String DEFAULT_VALUE = "unknown"; + private static final String DEFAULT_SCHEDULER_THREAD_NAME_PREFIX = "micrometer-kafka-metrics"; + private static final Set> counterMeasurableClasses = new HashSet<>(); static { @@ -96,8 +98,9 @@ class KafkaMetrics implements MeterBinder, AutoCloseable { private final Duration refreshInterval; - private final ScheduledExecutorService scheduler = Executors - .newSingleThreadScheduledExecutor(new NamedThreadFactory("micrometer-kafka-metrics")); + private final ScheduledExecutorService scheduler; + + private final boolean schedulerExternallyManaged; @Nullable private Iterable commonTags; @@ -122,11 +125,23 @@ class KafkaMetrics implements MeterBinder, AutoCloseable { this(metricsSupplier, extraTags, DEFAULT_REFRESH_INTERVAL); } + KafkaMetrics(Supplier> metricsSupplier, Iterable extraTags, + ScheduledExecutorService scheduler) { + this(metricsSupplier, extraTags, DEFAULT_REFRESH_INTERVAL, scheduler, true); + } + KafkaMetrics(Supplier> metricsSupplier, Iterable extraTags, Duration refreshInterval) { + this(metricsSupplier, extraTags, refreshInterval, createDefaultScheduler(), false); + } + + KafkaMetrics(Supplier> metricsSupplier, Iterable extraTags, + Duration refreshInterval, ScheduledExecutorService scheduler, boolean schedulerExternallyManaged) { this.metricsSupplier = metricsSupplier; this.extraTags = extraTags; this.refreshInterval = refreshInterval; + this.scheduler = scheduler; + this.schedulerExternallyManaged = schedulerExternallyManaged; } @Override @@ -295,6 +310,10 @@ private static Class getMeasurableClass(Metric metric) { } } + private static ScheduledExecutorService createDefaultScheduler() { + return Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(DEFAULT_SCHEDULER_THREAD_NAME_PREFIX)); + } + private Gauge registerGauge(MeterRegistry registry, MetricName metricName, String meterName, Iterable tags) { return Gauge.builder(meterName, this.metrics, toMetricValue(metricName)) .tags(tags) @@ -344,7 +363,9 @@ private Meter.Id meterIdForComparison(MetricName metricName) { @Override public void close() { - this.scheduler.shutdownNow(); + if (!schedulerExternallyManaged) { + this.scheduler.shutdownNow(); + } for (Meter.Id id : registeredMeterIds) { registry.remove(id); diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaStreamsMetrics.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaStreamsMetrics.java index 3f0f7d569a..07ff3ccb69 100644 --- a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaStreamsMetrics.java +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaStreamsMetrics.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.streams.KafkaStreams; +import java.util.concurrent.ScheduledExecutorService; + /** * Kafka Streams metrics binder. This should be closed on application shutdown to clean up * resources. @@ -58,4 +60,19 @@ public KafkaStreamsMetrics(KafkaStreams kafkaStreams) { super(kafkaStreams::metrics); } + /** + * {@link KafkaStreams} metrics binder. The lifecycle of the custom scheduler passed + * is the responsibility of the caller. It will not be shut down when this instance is + * {@link #close() closed}. A scheduler can be shared among multiple instances of + * {@link KafkaStreamsMetrics} to reduce resource usage by reducing the number of + * threads if there will be many instances. + * @param kafkaStreams instance to be instrumented + * @param tags additional tags + * @param scheduler customer scheduler to run the task that checks and binds metrics + * @since 1.14.0 + */ + public KafkaStreamsMetrics(KafkaStreams kafkaStreams, Iterable tags, ScheduledExecutorService scheduler) { + super(kafkaStreams::metrics, tags, scheduler); + } + } diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsAdminTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsAdminTest.java index e060af2ffe..4dae4edf95 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsAdminTest.java +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsAdminTest.java @@ -23,6 +23,8 @@ import org.junit.jupiter.api.Test; import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX; import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG; @@ -32,7 +34,7 @@ class KafkaClientMetricsAdminTest { private static final String BOOTSTRAP_SERVERS = "localhost:9092"; - private Tags tags = Tags.of("app", "myapp", "version", "1"); + private final Tags tags = Tags.of("app", "myapp", "version", "1"); KafkaClientMetrics metrics; @@ -69,6 +71,27 @@ void shouldCreateMetersWithTags() { } } + @Test + void shouldCreateMetersWithTagsAndCustomScheduler() { + try (AdminClient adminClient = createAdmin()) { + ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(1); + metrics = new KafkaClientMetrics(adminClient, tags, customScheduler); + MeterRegistry registry = new SimpleMeterRegistry(); + + metrics.bindTo(registry); + + assertThat(registry.getMeters()).hasSizeGreaterThan(0) + .extracting(meter -> meter.getId().getTag("app")) + .allMatch(s -> s.equals("myapp")); + + metrics.close(); + assertThat(customScheduler.isShutdown()).isFalse(); + + customScheduler.shutdownNow(); + assertThat(customScheduler.isShutdown()).isTrue(); + } + } + private AdminClient createAdmin() { Properties adminConfig = new Properties(); adminConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsConsumerTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsConsumerTest.java index 7908f318d2..eb783f143c 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsConsumerTest.java +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsConsumerTest.java @@ -25,6 +25,8 @@ import org.junit.jupiter.api.Test; import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX; import static org.apache.kafka.clients.consumer.ConsumerConfig.*; @@ -34,7 +36,7 @@ class KafkaClientMetricsConsumerTest { private static final String BOOTSTRAP_SERVERS = "localhost:9092"; - private Tags tags = Tags.of("app", "myapp", "version", "1"); + private final Tags tags = Tags.of("app", "myapp", "version", "1"); KafkaClientMetrics metrics; @@ -71,6 +73,27 @@ void shouldCreateMetersWithTags() { } } + @Test + void shouldCreateMetersWithTagsAndCustomScheduler() { + try (Consumer consumer = createConsumer()) { + ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(1); + metrics = new KafkaClientMetrics(consumer, tags, customScheduler); + MeterRegistry registry = new SimpleMeterRegistry(); + + metrics.bindTo(registry); + + assertThat(registry.getMeters()).hasSizeGreaterThan(0) + .extracting(meter -> meter.getId().getTag("app")) + .allMatch(s -> s.equals("myapp")); + + metrics.close(); + assertThat(customScheduler.isShutdown()).isFalse(); + + customScheduler.shutdownNow(); + assertThat(customScheduler.isShutdown()).isTrue(); + } + } + private Consumer createConsumer() { Properties consumerConfig = new Properties(); consumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsProducerTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsProducerTest.java index 7d8131ff52..3d0d94ec02 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsProducerTest.java +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsProducerTest.java @@ -25,6 +25,8 @@ import org.junit.jupiter.api.Test; import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX; import static org.apache.kafka.clients.producer.ProducerConfig.*; @@ -34,7 +36,7 @@ class KafkaClientMetricsProducerTest { private static final String BOOTSTRAP_SERVERS = "localhost:9092"; - private Tags tags = Tags.of("app", "myapp", "version", "1"); + private final Tags tags = Tags.of("app", "myapp", "version", "1"); KafkaClientMetrics metrics; @@ -71,6 +73,27 @@ void shouldCreateMetersWithTags() { } } + @Test + void shouldCreateMetersWithTagsAndCustomScheduler() { + try (Producer producer = createProducer()) { + ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(1); + metrics = new KafkaClientMetrics(producer, tags, customScheduler); + MeterRegistry registry = new SimpleMeterRegistry(); + + metrics.bindTo(registry); + + assertThat(registry.getMeters()).hasSizeGreaterThan(0) + .extracting(meter -> meter.getId().getTag("app")) + .allMatch(s -> s.equals("myapp")); + + metrics.close(); + assertThat(customScheduler.isShutdown()).isFalse(); + + customScheduler.shutdownNow(); + assertThat(customScheduler.isShutdown()).isTrue(); + } + } + private Producer createProducer() { Properties producerConfig = new Properties(); producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaMetricsTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaMetricsTest.java index 452c5254f8..ff44ffd1a8 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaMetricsTest.java +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaMetricsTest.java @@ -34,10 +34,13 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; class KafkaMetricsTest { @@ -68,7 +71,7 @@ void shouldKeepMetersWhenMetricsDoNotChange() { } @Test - void closeShouldRemoveAllMeters() { + void closeShouldRemoveAllMetersAndShutdownDefaultScheduler() { // Given Supplier> supplier = () -> { MetricName metricName = new MetricName("a", "b", "c", new LinkedHashMap<>()); @@ -80,9 +83,35 @@ void closeShouldRemoveAllMeters() { kafkaMetrics.bindTo(registry); assertThat(registry.getMeters()).hasSize(1); + assertThat(isDefaultMetricsSchedulerThreadAlive()).isTrue(); kafkaMetrics.close(); assertThat(registry.getMeters()).isEmpty(); + await().until(() -> !isDefaultMetricsSchedulerThreadAlive()); + } + + @Test + void closeShouldRemoveAllMetersAndNotShutdownCustomScheduler() { + // Given + Supplier> supplier = () -> { + MetricName metricName = new MetricName("a", "b", "c", new LinkedHashMap<>()); + KafkaMetric metric = new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM); + return Collections.singletonMap(metricName, metric); + }; + ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(1); + kafkaMetrics = new KafkaMetrics(supplier, Collections.emptyList(), customScheduler); + MeterRegistry registry = new SimpleMeterRegistry(); + + kafkaMetrics.bindTo(registry); + assertThat(registry.getMeters()).hasSize(1); + await().until(() -> !isDefaultMetricsSchedulerThreadAlive()); + + kafkaMetrics.close(); + assertThat(registry.getMeters()).isEmpty(); + assertThat(customScheduler.isShutdown()).isFalse(); + + customScheduler.shutdownNow(); + assertThat(customScheduler.isShutdown()).isTrue(); } @Test @@ -552,4 +581,13 @@ private KafkaMetric createKafkaMetric(MetricName metricName) { return new KafkaMetric(this, metricName, new Value(), new MetricConfig(), Time.SYSTEM); } + private static boolean isDefaultMetricsSchedulerThreadAlive() { + return Thread.getAllStackTraces() + .keySet() + .stream() + .filter(Thread::isAlive) + .map(Thread::getName) + .anyMatch(name -> name.startsWith("micrometer-kafka-metrics")); + } + } diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaStreamsMetricsTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaStreamsMetricsTest.java index ff1ba3f317..0afd6f119b 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaStreamsMetricsTest.java +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaStreamsMetricsTest.java @@ -24,6 +24,8 @@ import org.junit.jupiter.api.Test; import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import static io.micrometer.core.instrument.binder.kafka.KafkaStreamsMetrics.METRIC_NAME_PREFIX; import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG; @@ -73,6 +75,27 @@ void shouldCreateMetersWithTags() { } } + @Test + void shouldCreateMetersWithTagsAndCustomScheduler() { + try (KafkaStreams kafkaStreams = createStreams()) { + ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(1); + metrics = new KafkaStreamsMetrics(kafkaStreams, tags, customScheduler); + MeterRegistry registry = new SimpleMeterRegistry(); + + metrics.bindTo(registry); + + assertThat(registry.getMeters()).hasSizeGreaterThan(0) + .extracting(meter -> meter.getId().getTag("app")) + .allMatch(s -> s.equals("myapp")); + + metrics.close(); + assertThat(customScheduler.isShutdown()).isFalse(); + + customScheduler.shutdownNow(); + assertThat(customScheduler.isShutdown()).isTrue(); + } + } + private KafkaStreams createStreams() { StreamsBuilder builder = new StreamsBuilder(); builder.stream("input").to("output");