Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Virtual threads metrics #5067

Merged
merged 20 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ subprojects {

check.dependsOn("testModules")

if (!(project.name in ['micrometer-test-aspectj-ltw', 'micrometer-test-aspectj-ctw'])) { // add projects here that do not exist in the previous minor so should be excluded from japicmp
if (!(project.name in ['micrometer-test-aspectj-ltw', 'micrometer-test-aspectj-ctw', 'micrometer-java21'])) { // add projects here that do not exist in the previous minor so should be excluded from japicmp
apply plugin: 'me.champeau.gradle.japicmp'
apply plugin: 'de.undercouch.download'

Expand Down
40 changes: 40 additions & 0 deletions micrometer-java21/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
description 'Micrometer core classes that require Java 21'

// skip this module when building with jdk <21
if (!javaLanguageVersion.canCompileOrRun(21)) {
project.tasks.configureEach { task -> task.enabled = false }
}

dependencies {
api project(':micrometer-core')

testImplementation 'org.junit.jupiter:junit-jupiter'
testImplementation 'org.assertj:assertj-core'
testImplementation 'org.awaitility:awaitility'
}

java {
targetCompatibility = 21
}

tasks.withType(JavaCompile).configureEach {
sourceCompatibility = JavaVersion.VERSION_21
targetCompatibility = JavaVersion.VERSION_21
options.release = 21
}

task reflectiveTests(type: Test) {
useJUnitPlatform {
includeTags 'reflective'
}

// This hack is needed since VirtualThreadMetricsReflectiveTests utilizes reflection against java.lang, see its javadoc
jvmArgs += ['--add-opens', 'java.base/java.lang=ALL-UNNAMED']
jonatan-ivanov marked this conversation as resolved.
Show resolved Hide resolved
}

test {
dependsOn reflectiveTests
useJUnitPlatform {
excludeTags 'reflective'
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2024 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.java21.instrument.binder.jdk;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.binder.MeterBinder;
import jdk.jfr.consumer.RecordingStream;

import java.io.Closeable;
import java.time.Duration;
import java.util.Objects;

import static java.util.Collections.emptyList;

/**
* Instrumentation support for Virtual Threads, see:
* https://openjdk.org/jeps/425#JDK-Flight-Recorder-JFR
*
* @author Artyom Gabeev
* @since 1.14.0
*/
public class VirtualThreadMetrics implements MeterBinder, Closeable {

private static final String PINNED_EVENT = "jdk.VirtualThreadPinned";

private static final String SUBMIT_FAILED_EVENT = "jdk.VirtualThreadSubmitFailed";

private final RecordingStream recordingStream;

private final Iterable<Tag> tags;

public VirtualThreadMetrics() {
this(new RecordingConfig(), emptyList());
}

public VirtualThreadMetrics(Iterable<Tag> tags) {
this(new RecordingConfig(), tags);
}

private VirtualThreadMetrics(RecordingConfig config, Iterable<Tag> tags) {
this.recordingStream = createRecordingStream(config);
this.tags = tags;
}

@Override
public void bindTo(MeterRegistry registry) {
Timer pinnedTimer = Timer.builder("jvm.threads.virtual.pinned")
.description("The duration while the virtual thread was pinned without releasing its platform thread")
.tags(tags)
.register(registry);

Counter submitFailedCounter = Counter.builder("jvm.threads.virtual.submit.failed")
.description("The number of events when starting or unparking a virtual thread failed")
.tags(tags)
.register(registry);

recordingStream.onEvent(PINNED_EVENT, event -> pinnedTimer.record(event.getDuration()));
recordingStream.onEvent(SUBMIT_FAILED_EVENT, event -> submitFailedCounter.increment());
}

private RecordingStream createRecordingStream(RecordingConfig config) {
RecordingStream recordingStream = new RecordingStream();
recordingStream.enable(PINNED_EVENT).withThreshold(config.pinnedThreshold);
recordingStream.enable(SUBMIT_FAILED_EVENT);
recordingStream.setMaxAge(config.maxAge);
recordingStream.setMaxSize(config.maxSizeBytes);
recordingStream.startAsync();
shakuzen marked this conversation as resolved.
Show resolved Hide resolved

return recordingStream;
}

@Override
public void close() {
recordingStream.close();
}

private record RecordingConfig(Duration maxAge, long maxSizeBytes, Duration pinnedThreshold) {
private RecordingConfig() {
this(Duration.ofSeconds(5), 10L * 1024 * 1024, Duration.ofMillis(20));
}

private RecordingConfig {
Objects.requireNonNull(maxAge, "maxAge parameter must not be null");
Objects.requireNonNull(pinnedThreshold, "pinnedThreshold must not be null");
if (maxSizeBytes < 0) {
throw new IllegalArgumentException("maxSizeBytes must be positive");
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2024 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Instrumentation of JDK classes.
*/
@NonNullApi
@NonNullFields
package io.micrometer.java21.instrument.binder.jdk;

import io.micrometer.common.lang.NonNullApi;
import io.micrometer.common.lang.NonNullFields;
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright 2024 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.java21.instrument.binder.jdk;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.lang.reflect.Constructor;
import java.time.Duration;
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;

import static java.lang.Thread.State.WAITING;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.awaitility.Awaitility.await;

/**
* Tests for {@link VirtualThreadMetrics}. If you run these tests from your IDE,
* {@link #submitFailedEventsShouldBeRecorded()} might fail depending on your setup. This
* is because the test (through {@link #virtualThreadFactoryFor(Executor)}) utilizes
* reflection against the {@code java.lang} package which needs to be explicitly enabled.
* If you run into such an issue you can either change your setup and let your IDE run the
* tests utilizing the build system (Gradle) or add the following JVM arg to your test
* config: {@code --add-opens java.base/java.lang=ALL-UNNAMED}
*
* @author Artyom Gabeev
* @author Jonatan Ivanov
*/
@Tag("reflective")
class VirtualThreadMetricsReflectiveTests {

private static final Tags TAGS = Tags.of("k", "v");

private SimpleMeterRegistry registry;

private VirtualThreadMetrics virtualThreadMetrics;

@BeforeEach
void setUp() {
registry = new SimpleMeterRegistry();
virtualThreadMetrics = new VirtualThreadMetrics(TAGS);
virtualThreadMetrics.bindTo(registry);
}

@AfterEach
void tearDown() {
virtualThreadMetrics.close();
}

/**
* Uses a similar approach as the JDK tests to make starting or unparking a virtual
* thread fail, see {@link #virtualThreadFactoryFor(Executor)} and <a href=
* "https://github.com/openjdk/jdk/blob/fdfe503d016086cf78b5a8c27dbe45f0261c68ab/test/jdk/java/lang/Thread/virtual/JfrEvents.java#L143-L187">JfrEvents.java</a>
*/
@Test
void submitFailedEventsShouldBeRecorded() {
try (ExecutorService cachedPool = Executors.newCachedThreadPool()) {
ThreadFactory factory = virtualThreadFactoryFor(cachedPool);
Thread thread = factory.newThread(LockSupport::park);
thread.start();

await().atMost(Duration.ofSeconds(2)).until(() -> thread.getState() == WAITING);
cachedPool.shutdown();

// unpark, the pool was shut down, this should fail
assertThatThrownBy(() -> LockSupport.unpark(thread)).isInstanceOf(RejectedExecutionException.class);

Counter counter = registry.get("jvm.threads.virtual.submit.failed").tags(TAGS).counter();
await().atMost(Duration.ofSeconds(2)).until(() -> counter.count() == 1);

// park, the pool was shut down, this should fail
assertThatThrownBy(() -> factory.newThread(LockSupport::park).start())
.isInstanceOf(RejectedExecutionException.class);
await().atMost(Duration.ofSeconds(2)).until(() -> counter.count() == 2);
}
}

/**
* Creates a {@link ThreadFactory} for virtual threads. The created virtual threads
* will be bound to the provided platform thread pool instead of a default
* ForkJoinPool. At its current form, this is a hack, it utilizes reflection to supply
* the platform thread pool. It seems though there is no other way of doing this, the
* JDK tests are also utilizing reflection to do the same, see: <a href=
* "https://github.com/openjdk/jdk/blob/fdfe503d016086cf78b5a8c27dbe45f0261c68ab/test/lib/jdk/test/lib/thread/VThreadScheduler.java#L71-L90">VThreadScheduler.java</a>
* @param pool platform pool
* @return virtual thread factory bound to the provided platform pool
*/
private static ThreadFactory virtualThreadFactoryFor(Executor pool) {
try {
Class<?> clazz = Class.forName("java.lang.ThreadBuilders$VirtualThreadBuilder");
Constructor<?> constructor = clazz.getDeclaredConstructor(Executor.class);
constructor.setAccessible(true);
return ((Thread.Builder.OfVirtual) constructor.newInstance(pool)).factory();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

}
Loading