diff --git a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java index 4c43c5776d..1e8dfccca0 100644 --- a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java +++ b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java @@ -89,6 +89,10 @@ public class OtlpMeterRegistry extends PushMeterRegistry { private long deltaAggregationTimeUnixNano = 0L; + // Time when the last scheduled rollOver has started. Applicable only for delta + // flavour. + private long lastMeterRolloverStartTime = -1; + @Nullable private ScheduledExecutorService meterPollingService; @@ -247,8 +251,8 @@ protected DistributionStatisticConfig defaultHistogramConfig() { public void close() { stop(); if (config.enabled() && isDelta() && !isClosed()) { - if (!isDataPublishedForCurrentStep() && !isPublishing()) { - // Data was not published for the current step. So, we should flush that + if (shouldPublishDataForLastStep() && !isPublishing()) { + // Data was not published for the last step. So, we should flush that // first. try { publish(); @@ -267,9 +271,13 @@ else if (isPublishing()) { super.close(); } - private boolean isDataPublishedForCurrentStep() { - return (getLastScheduledPublishStartTime() / config.step().toMillis()) == (clock.wallTime() - / config.step().toMillis()); + private boolean shouldPublishDataForLastStep() { + if (lastMeterRolloverStartTime < 0) + return false; + + final long lastPublishedStep = getLastScheduledPublishStartTime() / config.step().toMillis(); + final long lastPolledStep = lastMeterRolloverStartTime / config.step().toMillis(); + return lastPublishedStep < lastPolledStep; } // Either we do this or make StepMeter public @@ -346,6 +354,7 @@ private Metric writeSum(Meter meter, DoubleSupplier count) { */ // VisibleForTesting void pollMetersToRollover() { + this.lastMeterRolloverStartTime = clock.wallTime(); this.getMeters() .forEach(m -> m.match(gauge -> null, Counter::count, Timer::takeSnapshot, DistributionSummary::takeSnapshot, meter -> null, meter -> null, FunctionCounter::count, FunctionTimer::count, meter -> null)); diff --git a/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpDeltaMeterRegistryTest.java b/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpDeltaMeterRegistryTest.java index 396f5422e7..1cbdffe01c 100644 --- a/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpDeltaMeterRegistryTest.java +++ b/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpDeltaMeterRegistryTest.java @@ -726,6 +726,20 @@ void whenCloseDuringScheduledPublish_thenPreviousStepAndCurrentPartialStepArePub assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(24); } + @Test + @Issue("#4357") + void publishOnceWhenClosedWithinFirstStep() { + // Set the initial clock time to a valid time. + MockClock mockClock = new MockClock(); + mockClock.add(otlpConfig().step().multipliedBy(5)); + + TestOtlpMeterRegistry stepMeterRegistry = new TestOtlpMeterRegistry(otlpConfig(), mockClock); + + assertThat(stepMeterRegistry.publishCount.get()).isZero(); + stepMeterRegistry.close(); + assertThat(stepMeterRegistry.publishCount.get()).isEqualTo(1); + } + private void assertEmptyHistogramSnapshot(HistogramSnapshot snapshot) { assertThat(snapshot.count()).isZero(); assertThat(snapshot.total()).isZero(); @@ -771,6 +785,8 @@ private void assertHistogramContains(HistogramSnapshot snapshot, double total, d private class TestOtlpMeterRegistry extends OtlpMeterRegistry { + private final AtomicInteger publishCount = new AtomicInteger(); + Deque publishedCounterCounts = new ArrayDeque<>(); Deque publishedTimerCounts = new ArrayDeque<>(); @@ -795,18 +811,24 @@ private class TestOtlpMeterRegistry extends OtlpMeterRegistry { Deque publishedFunctionTimerTotals = new ArrayDeque<>(); - private long lastScheduledPublishStartTime = 0L; + private long lastScheduledPublishStartTime; AtomicBoolean isPublishing = new AtomicBoolean(false); CompletableFuture scheduledPublishingFuture = CompletableFuture.completedFuture(null); TestOtlpMeterRegistry() { - super(otlpConfig(), OtlpDeltaMeterRegistryTest.this.clock); + this(otlpConfig(), OtlpDeltaMeterRegistryTest.this.clock); + } + + TestOtlpMeterRegistry(OtlpConfig otlpConfig, Clock clock) { + super(otlpConfig, clock); + this.lastScheduledPublishStartTime = super.getLastScheduledPublishStartTime(); } @Override protected void publish() { + publishCount.incrementAndGet(); forEachMeter(meter -> meter.match(null, this::publishCounter, this::publishTimer, this::publishSummary, null, null, this::publishFunctionCounter, this::publishFunctionTimer, null)); } diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java index 2435591425..60c522b394 100644 --- a/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java @@ -49,6 +49,9 @@ public abstract class StepMeterRegistry extends PushMeterRegistry { @Nullable private ScheduledExecutorService meterPollingService; + // Time when the last scheduled rollOver has started. + private long lastMeterRolloverStartTime = -1; + public StepMeterRegistry(StepRegistryConfig config, Clock clock) { super(config, clock); this.config = config; @@ -139,9 +142,9 @@ public void close() { stop(); if (config.enabled() && !isClosed()) { - if (!isDataPublishedForCurrentStep() && !isPublishing()) { - // Data was not published for the current step. So, we should flush that - // first. + if (shouldPublishDataForLastStep() && !isPublishing()) { + // Data was not published for the last completed step. So, we should flush + // that first. try { publish(); } @@ -159,9 +162,13 @@ else if (isPublishing()) { super.close(); } - private boolean isDataPublishedForCurrentStep() { - return (getLastScheduledPublishStartTime() / config.step().toMillis()) == (clock.wallTime() - / config.step().toMillis()); + private boolean shouldPublishDataForLastStep() { + if (lastMeterRolloverStartTime < 0) + return false; + + final long lastPublishedStep = getLastScheduledPublishStartTime() / config.step().toMillis(); + final long lastPolledStep = lastMeterRolloverStartTime / config.step().toMillis(); + return lastPublishedStep < lastPolledStep; } /** @@ -181,6 +188,7 @@ private void closingRolloverStepMeters() { */ // VisibleForTesting void pollMetersToRollover() { + this.lastMeterRolloverStartTime = clock.wallTime(); this.getMeters() .forEach(m -> m.match(gauge -> null, Counter::count, Timer::count, DistributionSummary::count, meter -> null, meter -> null, FunctionCounter::count, FunctionTimer::count, meter -> null)); diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/step/StepMeterRegistryTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/step/StepMeterRegistryTest.java index cd8927022e..75200ea73b 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/instrument/step/StepMeterRegistryTest.java +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/step/StepMeterRegistryTest.java @@ -50,8 +50,6 @@ */ class StepMeterRegistryTest { - private final AtomicInteger publishes = new AtomicInteger(); - private final MockClock clock = new MockClock(); private final StepRegistryConfig config = new StepRegistryConfig() { @@ -98,9 +96,9 @@ void serviceLevelObjectivesOnlyNoPercentileHistogram() { @Issue("#484") @Test void publishOneLastTimeOnClose() { - assertThat(publishes.get()).isEqualTo(0); + assertThat(registry.publishCount.get()).isZero(); registry.close(); - assertThat(publishes.get()).isEqualTo(1); + assertThat(registry.publishCount.get()).isEqualTo(1); } @Issue("#1993") @@ -425,7 +423,7 @@ void scheduledRollOver() { } @Test - @Issue("3914") + @Issue("#3914") void publishShouldNotHappenWhenRegistryIsDisabled() { StepRegistryConfig disabledStepRegistryConfig = new StepRegistryConfig() { @Override @@ -449,27 +447,26 @@ public String get(String key) { Counter.builder("publish_disabled_counter").register(disabledStepMeterRegistry).increment(); clock.add(config.step()); - assertThat(publishes.get()).isZero(); + assertThat(disabledStepMeterRegistry.publishCount.get()).isZero(); disabledStepMeterRegistry.close(); - assertThat(publishes.get()).isZero(); + assertThat(disabledStepMeterRegistry.publishCount.get()).isZero(); } @Test - @Issue("3914") + @Issue("#3914") void publishShouldNotHappenWhenRegistryIsClosed() { Counter.builder("my.counter").register(registry).increment(); clock.add(config.step()); - assertThat(publishes.get()).isZero(); + assertThat(registry.publishCount.get()).isZero(); registry.close(); - assertThat(publishes.get()).isEqualTo(2); - assertThat(registry.publishedCounterCounts).hasSize(2); - assertThat(registry.publishedCounterCounts.getFirst()).isOne(); - assertThat(registry.publishedCounterCounts.getLast()).isZero(); + assertThat(registry.publishCount.get()).isEqualTo(1); + assertThat(registry.publishedCounterCounts).hasSize(1); clock.add(config.step()); registry.close(); - assertThat(publishes.get()).isEqualTo(2); + assertThat(registry.publishCount.get()).isEqualTo(1); + assertThat(registry.publishedCounterCounts).hasSize(1); } @Test @@ -557,8 +554,23 @@ void whenCloseDuringScheduledPublish_thenPreviousStepAndCurrentPartialStepArePub assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(24); } + @Test + @Issue("#4357") + void publishOnceWhenClosedWithinFirstStep() { + // Set the initial clock time to a valid time. + MockClock mockClock = new MockClock(); + mockClock.add(config.step().multipliedBy(5)); + + MyStepMeterRegistry stepMeterRegistry = new MyStepMeterRegistry(config, mockClock); + assertThat(stepMeterRegistry.publishCount.get()).isZero(); + stepMeterRegistry.close(); + assertThat(stepMeterRegistry.publishCount.get()).isEqualTo(1); + } + private class MyStepMeterRegistry extends StepMeterRegistry { + private final AtomicInteger publishCount = new AtomicInteger(); + Deque publishedCounterCounts = new ArrayDeque<>(); Deque publishedTimerCounts = new ArrayDeque<>(); @@ -575,7 +587,7 @@ private class MyStepMeterRegistry extends StepMeterRegistry { Deque publishedFunctionTimerTotals = new ArrayDeque<>(); - private long lastScheduledPublishStartTime = 0L; + private long lastScheduledPublishStartTime; @Nullable Runnable prePublishAction; @@ -590,6 +602,7 @@ private class MyStepMeterRegistry extends StepMeterRegistry { MyStepMeterRegistry(StepRegistryConfig config, Clock clock) { super(config, clock); + this.lastScheduledPublishStartTime = super.getLastScheduledPublishStartTime(); } void setPrePublishAction(Runnable prePublishAction) { @@ -601,7 +614,7 @@ protected void publish() { if (prePublishAction != null) { prePublishAction.run(); } - publishes.incrementAndGet(); + publishCount.incrementAndGet(); getMeters().stream() .map(meter -> meter.match(g -> null, this::publishCounter, this::publishTimer, this::publishSummary, null, tg -> null, this::publishFunctionCounter, this::publishFunctionTimer, m -> null))