Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Stackdriver Distribution count and bucket count sum mismatch #5836

Merged
merged 1 commit into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions implementations/micrometer-registry-stackdriver/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ dependencies {
api libs.googleOauth2Http
implementation 'org.slf4j:slf4j-api'
compileOnly 'ch.qos.logback:logback-classic'
// needed for extending TimeWindowPercentileHistogram in StackdriverHistogramUtil
compileOnly libs.hdrhistogram

testImplementation project(':micrometer-test')
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2025 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.stackdriver;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.distribution.*;
import io.micrometer.core.instrument.step.StepDistributionSummary;

import static io.micrometer.stackdriver.StackdriverHistogramUtil.stackdriverHistogram;

class StackdriverDistributionSummary extends StepDistributionSummary {

public StackdriverDistributionSummary(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
double scale, long stepMillis) {
super(id, clock, distributionStatisticConfig, scale, stepMillis,
stackdriverHistogram(clock, distributionStatisticConfig));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2025 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.stackdriver;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.distribution.*;

final class StackdriverHistogramUtil {

private StackdriverHistogramUtil() {
}

// copied and modified from AbstractDistributionSummary/AbstractTimer
static Histogram stackdriverHistogram(Clock clock, DistributionStatisticConfig distributionStatisticConfig) {
if (distributionStatisticConfig.isPublishingPercentiles()) {
return new StackdriverClientSidePercentilesHistogram(clock, distributionStatisticConfig);
}
if (distributionStatisticConfig.isPublishingHistogram()) {
return new StackdriverFixedBoundaryHistogram(clock, distributionStatisticConfig);
}
return NoopHistogram.INSTANCE;
}

static class StackdriverClientSidePercentilesHistogram extends TimeWindowPercentileHistogram {

StackdriverClientSidePercentilesHistogram(Clock clock,
DistributionStatisticConfig distributionStatisticConfig) {
super(clock, distributionStatisticConfig, true, false, true);
}

}

static class StackdriverFixedBoundaryHistogram extends TimeWindowFixedBoundaryHistogram {

StackdriverFixedBoundaryHistogram(Clock clock, DistributionStatisticConfig config) {
super(clock, config, true, false, true);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.distribution.HistogramSupport;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.instrument.step.StepDistributionSummary;
import io.micrometer.core.instrument.step.StepMeterRegistry;
import io.micrometer.core.instrument.step.StepTimer;
import io.micrometer.core.instrument.util.DoubleFormat;
import io.micrometer.core.instrument.util.NamedThreadFactory;
import org.slf4j.Logger;
Expand All @@ -46,7 +44,6 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -289,15 +286,15 @@ Stream<TimeSeries> createLongTaskTimer(Batch batch, LongTaskTimer longTaskTimer)
@Override
protected DistributionSummary newDistributionSummary(Meter.Id id,
DistributionStatisticConfig distributionStatisticConfig, double scale) {
return new StepDistributionSummary(id, clock, distributionStatisticConfig, scale, config.step().toMillis(),
true);
return new StackdriverDistributionSummary(id, clock, distributionStatisticConfig, scale,
config.step().toMillis());
}

@Override
protected Timer newTimer(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector) {
return new StepTimer(id, clock, distributionStatisticConfig, pauseDetector, getBaseTimeUnit(),
this.config.step().toMillis(), true);
return new StackdriverTimer(id, clock, distributionStatisticConfig, pauseDetector, getBaseTimeUnit(),
this.config.step().toMillis());
}

@Override
Expand Down Expand Up @@ -508,18 +505,17 @@ private TimeInterval interval(MetricDescriptor.MetricKind metricKind) {
Distribution distribution(HistogramSnapshot snapshot, boolean timeDomain) {
CountAtBucket[] histogram = snapshot.histogramCounts();

// selected finite buckets (represented as a normal histogram)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer need to convert cumulative buckets to non-cumulative because with the change to dedicated Stackdriver types for Timer and DistributionSummary, we pass the flag for non-cumulative histogram.

AtomicLong truncatedSum = new AtomicLong();
AtomicReference<Double> last = new AtomicReference<>(0.0);
List<Long> bucketCounts = Arrays.stream(histogram).map(countAtBucket -> {
double cumulativeCount = countAtBucket.count();
long bucketCount = (long) (cumulativeCount - last.getAndSet(cumulativeCount));
truncatedSum.addAndGet(bucketCount);
return bucketCount;
}).collect(toCollection(ArrayList::new));

if (!bucketCounts.isEmpty()) {
int endIndex = bucketCounts.size() - 1;
List<Long> bucketCounts = Arrays.stream(histogram)
.map(CountAtBucket::count)
.map(Double::longValue)
.collect(toCollection(ArrayList::new));
long cumulativeCount = Arrays.stream(histogram).mapToLong(c -> (long) c.count()).sum();

// no-op histogram will have no buckets; other histograms should have at least
// the +Inf bucket
if (!bucketCounts.isEmpty() && bucketCounts.size() > 1) {
// the rightmost bucket should be the infinity bucket; do not trim that
int endIndex = bucketCounts.size() - 2;
// trim zero-count buckets on the right side of the domain
if (bucketCounts.get(endIndex) == 0) {
int lastNonZeroIndex = 0;
Expand All @@ -529,30 +525,41 @@ Distribution distribution(HistogramSnapshot snapshot, boolean timeDomain) {
break;
}
}
long infCount = bucketCounts.get(bucketCounts.size() - 1);
bucketCounts = bucketCounts.subList(0, lastNonZeroIndex + 1);
// infinite bucket count of 0 can be omitted
bucketCounts.add(infCount);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left this note here but did not omit 0 count infinity buckets to try to keep the behavior the same as before since this is a change in a maintenance branch.

}
}

// add the "+infinity" bucket, which does NOT have a corresponding bucket
// boundary
bucketCounts.add(Math.max(0, snapshot.count() - truncatedSum.get()));
// no-op histogram
if (bucketCounts.isEmpty()) {
bucketCounts.add(0L);
}

List<Double> bucketBoundaries = Arrays.stream(histogram)
.map(countAtBucket -> timeDomain ? countAtBucket.bucket(getBaseTimeUnit()) : countAtBucket.bucket())
.collect(toCollection(ArrayList::new));

if (bucketBoundaries.size() == 1) {
bucketBoundaries.remove(Double.POSITIVE_INFINITY);
}

// trim bucket boundaries to match bucket count trimming
if (bucketBoundaries.size() != bucketCounts.size() - 1) {
bucketBoundaries = bucketBoundaries.subList(0, bucketCounts.size() - 1);
}

// stackdriver requires at least one finite bucket
// Stackdriver requires at least one explicit bucket bound
if (bucketBoundaries.isEmpty()) {
bucketBoundaries.add(0.0);
}

return Distribution.newBuilder()
// is the mean optional? better to not send as it is for a different time
// window than the histogram
.setMean(timeDomain ? snapshot.mean(getBaseTimeUnit()) : snapshot.mean())
.setCount(snapshot.count())
.setCount(cumulativeCount)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the part that fixes things - snapshot.count() is the step-based count, which does not align with the histogram time window, while the cumulativeCount is taken from the histogram.

.setBucketOptions(Distribution.BucketOptions.newBuilder()
.setExplicitBuckets(
Distribution.BucketOptions.Explicit.newBuilder().addAllBounds(bucketBoundaries).build())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2025 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.stackdriver;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.instrument.step.StepTimer;

import java.util.concurrent.TimeUnit;

import static io.micrometer.stackdriver.StackdriverHistogramUtil.stackdriverHistogram;

class StackdriverTimer extends StepTimer {

public StackdriverTimer(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector, TimeUnit baseTimeUnit, long stepDurationMillis) {
super(id, clock, distributionStatisticConfig, pauseDetector, baseTimeUnit, stepDurationMillis,
stackdriverHistogram(clock, distributionStatisticConfig));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.micrometer.core.instrument.MockClock;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import java.time.Duration;
Expand Down Expand Up @@ -87,7 +86,6 @@ void batchDistributionWhenHistogramSnapshotIsEmpty() {

// gh-4868 is an issue when the step count is less than the histogram count
@Test
@Disabled("gh-4868")
void distributionCountMustEqualBucketCountsSum() {
DistributionSummary ds = DistributionSummary.builder("ds").serviceLevelObjectives(1, 2).register(meterRegistry);
ds.record(1);
Expand All @@ -102,7 +100,6 @@ void distributionCountMustEqualBucketCountsSum() {
}

@Test
@Disabled("gh-4868")
void distributionWithTimerShouldHaveInfinityBucket() {
StackdriverMeterRegistry.Batch batch = meterRegistry.new Batch();
Timer timer = Timer.builder("timer")
Expand Down Expand Up @@ -140,16 +137,17 @@ void distributionWithPercentileHistogram() {
}

@Test
void distributionWithOnlyClientSidePercentilesIsEmpty() {
void distributionWithOnlyClientSidePercentilesHasSingleBound() {
StackdriverMeterRegistry.Batch batch = meterRegistry.new Batch();
DistributionSummary ds = DistributionSummary.builder("ds")
.publishPercentiles(0.5, 0.99)
.register(meterRegistry);
ds.record(1);
ds.record(5);

Distribution distribution = batch.distribution(ds.takeSnapshot(), false);
assertThat(distribution.getBucketOptions().getExplicitBuckets().getBoundsList()).containsExactly(0d);
assertThat(distribution.getBucketCountsList()).containsExactly(0L);
assertThat(distribution.getBucketCountsList()).containsExactly(1L);
assertThat(distribution.getCount()).isOne();
}

@Test
Expand All @@ -167,4 +165,16 @@ void distributionWithClientSidePercentilesAndBuckets() {
assertThat(distribution.getBucketCountsList()).containsExactly(1L, 0L, 1L, 0L);
}

@Test
void distributionWithOneExplicitBucket() {
StackdriverMeterRegistry.Batch batch = meterRegistry.new Batch();
DistributionSummary ds = DistributionSummary.builder("ds").serviceLevelObjectives(3).register(meterRegistry);
ds.record(1);
ds.record(5);

Distribution distribution = batch.distribution(ds.takeSnapshot(), false);
assertThat(distribution.getBucketOptions().getExplicitBuckets().getBoundsList()).containsExactly(3d);
assertThat(distribution.getBucketCountsList()).containsExactly(1L, 1L);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

import java.io.PrintStream;
import java.lang.reflect.Array;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

/**
Expand All @@ -45,8 +43,6 @@ abstract class AbstractTimeWindowHistogram<T, U> implements Histogram {

private final Clock clock;

private final boolean supportsAggregablePercentiles;

private final T[] ringBuffer;

private short currentBucket;
Expand All @@ -65,10 +61,9 @@ abstract class AbstractTimeWindowHistogram<T, U> implements Histogram {

@SuppressWarnings("unchecked")
AbstractTimeWindowHistogram(Clock clock, DistributionStatisticConfig distributionStatisticConfig,
Class<T> bucketType, boolean supportsAggregablePercentiles) {
Class<T> bucketType) {
this.clock = clock;
this.distributionStatisticConfig = validateDistributionConfig(distributionStatisticConfig);
this.supportsAggregablePercentiles = supportsAggregablePercentiles;

final int ageBuckets = distributionStatisticConfig.getBufferLength();

Expand Down Expand Up @@ -125,7 +120,11 @@ void initRingBuffer() {

abstract double valueAtPercentile(double percentile);

abstract Iterator<CountAtBucket> countsAtValues(Iterator<Double> values);
/**
* @return counts at the monitored histogram buckets for this histogram
*/
@Nullable
abstract CountAtBucket[] countsAtBuckets();

void outputSummary(PrintStream out, double bucketScaling) {
}
Expand All @@ -139,7 +138,7 @@ public final HistogramSnapshot takeSnapshot(long count, double total, double max
synchronized (this) {
accumulateIfStale();
values = takeValueSnapshot();
counts = takeCountSnapshot();
counts = countsAtBuckets();
}

return new HistogramSnapshot(count, total, max, values, counts, this::outputSummary);
Expand All @@ -166,25 +165,6 @@ private ValueAtPercentile[] takeValueSnapshot() {
return values;
}

private CountAtBucket[] takeCountSnapshot() {
if (!distributionStatisticConfig.isPublishingHistogram()) {
return null;
}

final Set<Double> monitoredValues = distributionStatisticConfig
.getHistogramBuckets(supportsAggregablePercentiles);
if (monitoredValues.isEmpty()) {
return null;
}

final CountAtBucket[] counts = new CountAtBucket[monitoredValues.size()];
final Iterator<CountAtBucket> iterator = countsAtValues(monitoredValues.iterator());
for (int i = 0; i < counts.length; i++) {
counts[i] = iterator.next();
}
return counts;
}

public void recordLong(long value) {
rotate();
try {
Expand Down
Loading