Skip to content

Commit

Permalink
Fixes #4976 - added optional custom scheduler to Producer and AdminCl…
Browse files Browse the repository at this point in the history
…ient KafkaClientMetrics
  • Loading branch information
vasiliy-sarzhynskyi committed Oct 11, 2024
1 parent 06cff5f commit ad1d299
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@
@NonNullFields
public class KafkaClientMetrics extends KafkaMetrics {

/**
* Kafka {@link Producer} metrics binder
* @param kafkaProducer producer instance to be instrumented
* @param tags additional tags
* @param scheduler scheduler to check and bind metrics``
*/
public KafkaClientMetrics(Producer<?, ?> kafkaProducer, Iterable<Tag> tags, ScheduledExecutorService scheduler) {
super(kafkaProducer::metrics, tags, scheduler);
}

/**
* Kafka {@link Producer} metrics binder
* @param kafkaProducer producer instance to be instrumented
Expand Down Expand Up @@ -89,6 +99,16 @@ public KafkaClientMetrics(Consumer<?, ?> kafkaConsumer) {
super(kafkaConsumer::metrics);
}

/**
* Kafka {@link AdminClient} metrics binder
* @param adminClient instance to be instrumented
* @param tags additional tags
* @param scheduler scheduler to check and bind metrics
*/
public KafkaClientMetrics(AdminClient adminClient, Iterable<Tag> tags, ScheduledExecutorService scheduler) {
super(adminClient::metrics, tags, scheduler);
}

/**
* Kafka {@link AdminClient} metrics binder
* @param adminClient instance to be instrumented
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.junit.jupiter.api.Test;

import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX;
import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
Expand All @@ -32,7 +34,7 @@ class KafkaClientMetricsAdminTest {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";

private Tags tags = Tags.of("app", "myapp", "version", "1");
private final Tags tags = Tags.of("app", "myapp", "version", "1");

KafkaClientMetrics metrics;

Expand Down Expand Up @@ -69,6 +71,27 @@ void shouldCreateMetersWithTags() {
}
}

@Test
void shouldCreateMetersWithTagsAndCustomScheduler() {
try (AdminClient adminClient = createAdmin()) {
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(2);
metrics = new KafkaClientMetrics(adminClient, tags, customScheduler);
MeterRegistry registry = new SimpleMeterRegistry();

metrics.bindTo(registry);

assertThat(registry.getMeters()).hasSizeGreaterThan(0)
.extracting(meter -> meter.getId().getTag("app"))
.allMatch(s -> s.equals("myapp"));

metrics.close();
assertThat(customScheduler.isShutdown()).isFalse();

customScheduler.shutdownNow();
assertThat(customScheduler.isShutdown()).isTrue();
}
}

private AdminClient createAdmin() {
Properties adminConfig = new Properties();
adminConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.junit.jupiter.api.Test;

import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics.METRIC_NAME_PREFIX;
import static org.apache.kafka.clients.producer.ProducerConfig.*;
Expand All @@ -34,7 +36,7 @@ class KafkaClientMetricsProducerTest {

private static final String BOOTSTRAP_SERVERS = "localhost:9092";

private Tags tags = Tags.of("app", "myapp", "version", "1");
private final Tags tags = Tags.of("app", "myapp", "version", "1");

KafkaClientMetrics metrics;

Expand Down Expand Up @@ -71,6 +73,27 @@ void shouldCreateMetersWithTags() {
}
}

@Test
void shouldCreateMetersWithTagsAndCustomScheduler() {
try (Producer<String, String> producer = createProducer()) {
ScheduledExecutorService customScheduler = Executors.newScheduledThreadPool(2);
metrics = new KafkaClientMetrics(producer, tags, customScheduler);
MeterRegistry registry = new SimpleMeterRegistry();

metrics.bindTo(registry);

assertThat(registry.getMeters()).hasSizeGreaterThan(0)
.extracting(meter -> meter.getId().getTag("app"))
.allMatch(s -> s.equals("myapp"));

metrics.close();
assertThat(customScheduler.isShutdown()).isFalse();

customScheduler.shutdownNow();
assertThat(customScheduler.isShutdown()).isTrue();
}
}

private Producer<String, String> createProducer() {
Properties producerConfig = new Properties();
producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
Expand Down

0 comments on commit ad1d299

Please sign in to comment.