From 2735d262f4bac5a1a92050d6677f920d9eea0e51 Mon Sep 17 00:00:00 2001 From: Lenin Jaganathan <32874349+lenin-jaganathan@users.noreply.github.com> Date: Mon, 11 Dec 2023 13:27:12 +0530 Subject: [PATCH] Avoid duplicate publish on StepMeterRegistry when closed within first step. (#4485) Do not perform a publish for the previous step if there has not been a previous step yet. This is achieved by internally tracking when polling of meters happens (to rollover values) and it will skip doing the publish for the previous step if they have never been rolled over. Resolves gh-4357 --- .../registry/otlp/OtlpMeterRegistry.java | 19 +++++--- .../otlp/OtlpDeltaMeterRegistryTest.java | 26 ++++++++++- .../instrument/step/StepMeterRegistry.java | 20 ++++++--- .../step/StepMeterRegistryTest.java | 45 ++++++++++++------- 4 files changed, 81 insertions(+), 29 deletions(-) diff --git a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java index 811c24b6df..3be079089d 100644 --- a/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java +++ b/implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/OtlpMeterRegistry.java @@ -87,6 +87,10 @@ public class OtlpMeterRegistry extends PushMeterRegistry { private long deltaAggregationTimeUnixNano = 0L; + // Time when the last scheduled rollOver has started. Applicable only for delta + // flavour. + private long lastMeterRolloverStartTime = -1; + @Nullable private ScheduledExecutorService meterPollingService; @@ -244,8 +248,8 @@ protected DistributionStatisticConfig defaultHistogramConfig() { public void close() { stop(); if (config.enabled() && isDelta() && !isClosed()) { - if (!isDataPublishedForCurrentStep() && !isPublishing()) { - // Data was not published for the current step. So, we should flush that + if (shouldPublishDataForLastStep() && !isPublishing()) { + // Data was not published for the last step. So, we should flush that // first. try { publish(); @@ -264,9 +268,13 @@ else if (isPublishing()) { super.close(); } - private boolean isDataPublishedForCurrentStep() { - return (getLastScheduledPublishStartTime() / config.step().toMillis()) == (clock.wallTime() - / config.step().toMillis()); + private boolean shouldPublishDataForLastStep() { + if (lastMeterRolloverStartTime < 0) + return false; + + final long lastPublishedStep = getLastScheduledPublishStartTime() / config.step().toMillis(); + final long lastPolledStep = lastMeterRolloverStartTime / config.step().toMillis(); + return lastPublishedStep < lastPolledStep; } // Either we do this or make StepMeter public @@ -343,6 +351,7 @@ private Metric writeSum(Meter meter, DoubleSupplier count) { */ // VisibleForTesting void pollMetersToRollover() { + this.lastMeterRolloverStartTime = clock.wallTime(); this.getMeters() .forEach(m -> m.match(gauge -> null, Counter::count, Timer::takeSnapshot, DistributionSummary::takeSnapshot, meter -> null, meter -> null, FunctionCounter::count, FunctionTimer::count, meter -> null)); diff --git a/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpDeltaMeterRegistryTest.java b/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpDeltaMeterRegistryTest.java index 396f5422e7..1cbdffe01c 100644 --- a/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpDeltaMeterRegistryTest.java +++ b/implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/OtlpDeltaMeterRegistryTest.java @@ -726,6 +726,20 @@ void whenCloseDuringScheduledPublish_thenPreviousStepAndCurrentPartialStepArePub assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(24); } + @Test + @Issue("#4357") + void publishOnceWhenClosedWithinFirstStep() { + // Set the initial clock time to a valid time. + MockClock mockClock = new MockClock(); + mockClock.add(otlpConfig().step().multipliedBy(5)); + + TestOtlpMeterRegistry stepMeterRegistry = new TestOtlpMeterRegistry(otlpConfig(), mockClock); + + assertThat(stepMeterRegistry.publishCount.get()).isZero(); + stepMeterRegistry.close(); + assertThat(stepMeterRegistry.publishCount.get()).isEqualTo(1); + } + private void assertEmptyHistogramSnapshot(HistogramSnapshot snapshot) { assertThat(snapshot.count()).isZero(); assertThat(snapshot.total()).isZero(); @@ -771,6 +785,8 @@ private void assertHistogramContains(HistogramSnapshot snapshot, double total, d private class TestOtlpMeterRegistry extends OtlpMeterRegistry { + private final AtomicInteger publishCount = new AtomicInteger(); + Deque publishedCounterCounts = new ArrayDeque<>(); Deque publishedTimerCounts = new ArrayDeque<>(); @@ -795,18 +811,24 @@ private class TestOtlpMeterRegistry extends OtlpMeterRegistry { Deque publishedFunctionTimerTotals = new ArrayDeque<>(); - private long lastScheduledPublishStartTime = 0L; + private long lastScheduledPublishStartTime; AtomicBoolean isPublishing = new AtomicBoolean(false); CompletableFuture scheduledPublishingFuture = CompletableFuture.completedFuture(null); TestOtlpMeterRegistry() { - super(otlpConfig(), OtlpDeltaMeterRegistryTest.this.clock); + this(otlpConfig(), OtlpDeltaMeterRegistryTest.this.clock); + } + + TestOtlpMeterRegistry(OtlpConfig otlpConfig, Clock clock) { + super(otlpConfig, clock); + this.lastScheduledPublishStartTime = super.getLastScheduledPublishStartTime(); } @Override protected void publish() { + publishCount.incrementAndGet(); forEachMeter(meter -> meter.match(null, this::publishCounter, this::publishTimer, this::publishSummary, null, null, this::publishFunctionCounter, this::publishFunctionTimer, null)); } diff --git a/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java b/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java index 2435591425..60c522b394 100644 --- a/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java +++ b/micrometer-core/src/main/java/io/micrometer/core/instrument/step/StepMeterRegistry.java @@ -49,6 +49,9 @@ public abstract class StepMeterRegistry extends PushMeterRegistry { @Nullable private ScheduledExecutorService meterPollingService; + // Time when the last scheduled rollOver has started. + private long lastMeterRolloverStartTime = -1; + public StepMeterRegistry(StepRegistryConfig config, Clock clock) { super(config, clock); this.config = config; @@ -139,9 +142,9 @@ public void close() { stop(); if (config.enabled() && !isClosed()) { - if (!isDataPublishedForCurrentStep() && !isPublishing()) { - // Data was not published for the current step. So, we should flush that - // first. + if (shouldPublishDataForLastStep() && !isPublishing()) { + // Data was not published for the last completed step. So, we should flush + // that first. try { publish(); } @@ -159,9 +162,13 @@ else if (isPublishing()) { super.close(); } - private boolean isDataPublishedForCurrentStep() { - return (getLastScheduledPublishStartTime() / config.step().toMillis()) == (clock.wallTime() - / config.step().toMillis()); + private boolean shouldPublishDataForLastStep() { + if (lastMeterRolloverStartTime < 0) + return false; + + final long lastPublishedStep = getLastScheduledPublishStartTime() / config.step().toMillis(); + final long lastPolledStep = lastMeterRolloverStartTime / config.step().toMillis(); + return lastPublishedStep < lastPolledStep; } /** @@ -181,6 +188,7 @@ private void closingRolloverStepMeters() { */ // VisibleForTesting void pollMetersToRollover() { + this.lastMeterRolloverStartTime = clock.wallTime(); this.getMeters() .forEach(m -> m.match(gauge -> null, Counter::count, Timer::count, DistributionSummary::count, meter -> null, meter -> null, FunctionCounter::count, FunctionTimer::count, meter -> null)); diff --git a/micrometer-core/src/test/java/io/micrometer/core/instrument/step/StepMeterRegistryTest.java b/micrometer-core/src/test/java/io/micrometer/core/instrument/step/StepMeterRegistryTest.java index cd8927022e..75200ea73b 100644 --- a/micrometer-core/src/test/java/io/micrometer/core/instrument/step/StepMeterRegistryTest.java +++ b/micrometer-core/src/test/java/io/micrometer/core/instrument/step/StepMeterRegistryTest.java @@ -50,8 +50,6 @@ */ class StepMeterRegistryTest { - private final AtomicInteger publishes = new AtomicInteger(); - private final MockClock clock = new MockClock(); private final StepRegistryConfig config = new StepRegistryConfig() { @@ -98,9 +96,9 @@ void serviceLevelObjectivesOnlyNoPercentileHistogram() { @Issue("#484") @Test void publishOneLastTimeOnClose() { - assertThat(publishes.get()).isEqualTo(0); + assertThat(registry.publishCount.get()).isZero(); registry.close(); - assertThat(publishes.get()).isEqualTo(1); + assertThat(registry.publishCount.get()).isEqualTo(1); } @Issue("#1993") @@ -425,7 +423,7 @@ void scheduledRollOver() { } @Test - @Issue("3914") + @Issue("#3914") void publishShouldNotHappenWhenRegistryIsDisabled() { StepRegistryConfig disabledStepRegistryConfig = new StepRegistryConfig() { @Override @@ -449,27 +447,26 @@ public String get(String key) { Counter.builder("publish_disabled_counter").register(disabledStepMeterRegistry).increment(); clock.add(config.step()); - assertThat(publishes.get()).isZero(); + assertThat(disabledStepMeterRegistry.publishCount.get()).isZero(); disabledStepMeterRegistry.close(); - assertThat(publishes.get()).isZero(); + assertThat(disabledStepMeterRegistry.publishCount.get()).isZero(); } @Test - @Issue("3914") + @Issue("#3914") void publishShouldNotHappenWhenRegistryIsClosed() { Counter.builder("my.counter").register(registry).increment(); clock.add(config.step()); - assertThat(publishes.get()).isZero(); + assertThat(registry.publishCount.get()).isZero(); registry.close(); - assertThat(publishes.get()).isEqualTo(2); - assertThat(registry.publishedCounterCounts).hasSize(2); - assertThat(registry.publishedCounterCounts.getFirst()).isOne(); - assertThat(registry.publishedCounterCounts.getLast()).isZero(); + assertThat(registry.publishCount.get()).isEqualTo(1); + assertThat(registry.publishedCounterCounts).hasSize(1); clock.add(config.step()); registry.close(); - assertThat(publishes.get()).isEqualTo(2); + assertThat(registry.publishCount.get()).isEqualTo(1); + assertThat(registry.publishedCounterCounts).hasSize(1); } @Test @@ -557,8 +554,23 @@ void whenCloseDuringScheduledPublish_thenPreviousStepAndCurrentPartialStepArePub assertThat(registry.publishedFunctionTimerTotals.pop()).isEqualTo(24); } + @Test + @Issue("#4357") + void publishOnceWhenClosedWithinFirstStep() { + // Set the initial clock time to a valid time. + MockClock mockClock = new MockClock(); + mockClock.add(config.step().multipliedBy(5)); + + MyStepMeterRegistry stepMeterRegistry = new MyStepMeterRegistry(config, mockClock); + assertThat(stepMeterRegistry.publishCount.get()).isZero(); + stepMeterRegistry.close(); + assertThat(stepMeterRegistry.publishCount.get()).isEqualTo(1); + } + private class MyStepMeterRegistry extends StepMeterRegistry { + private final AtomicInteger publishCount = new AtomicInteger(); + Deque publishedCounterCounts = new ArrayDeque<>(); Deque publishedTimerCounts = new ArrayDeque<>(); @@ -575,7 +587,7 @@ private class MyStepMeterRegistry extends StepMeterRegistry { Deque publishedFunctionTimerTotals = new ArrayDeque<>(); - private long lastScheduledPublishStartTime = 0L; + private long lastScheduledPublishStartTime; @Nullable Runnable prePublishAction; @@ -590,6 +602,7 @@ private class MyStepMeterRegistry extends StepMeterRegistry { MyStepMeterRegistry(StepRegistryConfig config, Clock clock) { super(config, clock); + this.lastScheduledPublishStartTime = super.getLastScheduledPublishStartTime(); } void setPrePublishAction(Runnable prePublishAction) { @@ -601,7 +614,7 @@ protected void publish() { if (prePublishAction != null) { prePublishAction.run(); } - publishes.incrementAndGet(); + publishCount.incrementAndGet(); getMeters().stream() .map(meter -> meter.match(g -> null, this::publishCounter, this::publishTimer, this::publishSummary, null, tg -> null, this::publishFunctionCounter, this::publishFunctionTimer, m -> null))