From ad1d2999cc6704dae9a6e43bbee65588b65cb50e Mon Sep 17 00:00:00 2001 From: Vasyl Sarzhynskyi Date: Fri, 11 Oct 2024 16:14:40 +0200 Subject: [PATCH] Fixes #4976 - added optional custom scheduler to Producer and AdminClient KafkaClientMetrics --- .../binder/kafka/KafkaClientMetrics.java | 20 +++++++++++++++ .../kafka/KafkaClientMetricsAdminTest.java | 25 ++++++++++++++++++- .../kafka/KafkaClientMetricsProducerTest.java | 25 ++++++++++++++++++- 3 files changed, 68 insertions(+), 2 deletions(-) diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetrics.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetrics.java index a7b009c88d..06bebfc346 100644 --- a/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetrics.java +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetrics.java @@ -45,6 +45,16 @@ @NonNullFields public class KafkaClientMetrics extends KafkaMetrics { + /** + * Kafka {@link Producer} metrics binder + * @param kafkaProducer producer instance to be instrumented + * @param tags additional tags + * @param scheduler scheduler to check and bind metrics`` + */ + public KafkaClientMetrics(Producer kafkaProducer, Iterable tags, ScheduledExecutorService scheduler) { + super(kafkaProducer::metrics, tags, scheduler); + } + /** * Kafka {@link Producer} metrics binder * @param kafkaProducer producer instance to be instrumented @@ -89,6 +99,16 @@ public KafkaClientMetrics(Consumer kafkaConsumer) { super(kafkaConsumer::metrics); } + /** + * Kafka {@link AdminClient} metrics binder + * @param adminClient instance to be instrumented + * @param tags additional tags + * @param scheduler scheduler to check and bind metrics + */ + public KafkaClientMetrics(AdminClient adminClient, Iterable tags, ScheduledExecutorService scheduler) { + super(adminClient::metrics, tags, scheduler); + } + /** * Kafka {@link AdminClient} metrics binder * @param adminClient instance to be instrumented diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsAdminTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsAdminTest.java index e060af2ffe..04e1e29574 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsAdminTest.java +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsAdminTest.java @@ -23,6 +23,8 @@ import org.junit.jupiter.api.Test; import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX; import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG; @@ -32,7 +34,7 @@ class KafkaClientMetricsAdminTest { private static final String BOOTSTRAP_SERVERS = "localhost:9092"; - private Tags tags = Tags.of("app", "myapp", "version", "1"); + private final Tags tags = Tags.of("app", "myapp", "version", "1"); KafkaClientMetrics metrics; @@ -69,6 +71,27 @@ void shouldCreateMetersWithTags() { } } + @Test + void shouldCreateMetersWithTagsAndCustomScheduler() { + try (AdminClient adminClient = createAdmin()) { + ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(2); + metrics = new KafkaClientMetrics(adminClient, tags, customScheduler); + MeterRegistry registry = new SimpleMeterRegistry(); + + metrics.bindTo(registry); + + assertThat(registry.getMeters()).hasSizeGreaterThan(0) + .extracting(meter -> meter.getId().getTag("app")) + .allMatch(s -> s.equals("myapp")); + + metrics.close(); + assertThat(customScheduler.isShutdown()).isFalse(); + + customScheduler.shutdownNow(); + assertThat(customScheduler.isShutdown()).isTrue(); + } + } + private AdminClient createAdmin() { Properties adminConfig = new Properties(); adminConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsProducerTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsProducerTest.java index 7d8131ff52..eaf6cfa28e 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsProducerTest.java +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/binder/kafka/KafkaClientMetricsProducerTest.java @@ -25,6 +25,8 @@ import org.junit.jupiter.api.Test; import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX; import static org.apache.kafka.clients.producer.ProducerConfig.*; @@ -34,7 +36,7 @@ class KafkaClientMetricsProducerTest { private static final String BOOTSTRAP_SERVERS = "localhost:9092"; - private Tags tags = Tags.of("app", "myapp", "version", "1"); + private final Tags tags = Tags.of("app", "myapp", "version", "1"); KafkaClientMetrics metrics; @@ -71,6 +73,27 @@ void shouldCreateMetersWithTags() { } } + @Test + void shouldCreateMetersWithTagsAndCustomScheduler() { + try (Producer producer = createProducer()) { + ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(2); + metrics = new KafkaClientMetrics(producer, tags, customScheduler); + MeterRegistry registry = new SimpleMeterRegistry(); + + metrics.bindTo(registry); + + assertThat(registry.getMeters()).hasSizeGreaterThan(0) + .extracting(meter -> meter.getId().getTag("app")) + .allMatch(s -> s.equals("myapp")); + + metrics.close(); + assertThat(customScheduler.isShutdown()).isFalse(); + + customScheduler.shutdownNow(); + assertThat(customScheduler.isShutdown()).isTrue(); + } + } + private Producer createProducer() { Properties producerConfig = new Properties(); producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);