Skip to content

Commit

Permalink
Fix Stackdriver Distribution count and bucket count sum mismatch
Browse files Browse the repository at this point in the history
The count passed to the `HistogramSnapshot` is from the StepTimer and is step-based, while the histogram buckets have a separate time window. This mismatch in time windows caused the two to generally not match which causes a validation error when publishing Distribution metrics to Stackdriver.

This solves this by tracking an infinity bucket in the histogram so we can get a consistent count in the same time window as the rest of the histogram.
  • Loading branch information
shakuzen committed Jan 30, 2025
1 parent e73ead8 commit f77da77
Show file tree
Hide file tree
Showing 10 changed files with 395 additions and 113 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2025 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.stackdriver;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.distribution.*;
import io.micrometer.core.instrument.step.StepDistributionSummary;

import static io.micrometer.stackdriver.StackdriverHistogramUtil.stackdriverHistogram;

class StackdriverDistributionSummary extends StepDistributionSummary {

public StackdriverDistributionSummary(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
double scale, long stepMillis) {
super(id, clock, distributionStatisticConfig, scale, stepMillis,
stackdriverHistogram(clock, distributionStatisticConfig));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2025 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.stackdriver;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.distribution.*;

final class StackdriverHistogramUtil {

private StackdriverHistogramUtil() {
}

// copied and modified from AbstractDistributionSummary/AbstractTimer
static Histogram stackdriverHistogram(Clock clock, DistributionStatisticConfig distributionStatisticConfig) {
if (distributionStatisticConfig.isPublishingPercentiles()) {
return new TimeWindowPercentileHistogram(clock, distributionStatisticConfig, true, false, true);
}
if (distributionStatisticConfig.isPublishingHistogram()) {
return new StackdriverFixedBoundaryHistogram(clock, distributionStatisticConfig);
}
return NoopHistogram.INSTANCE;
}

// Can't do this because java: cannot access org.HdrHistogram.DoubleRecorder
/*
* static class StackdriverClientSidePercentilesHistogram extends
* TimeWindowPercentileHistogram {
*
* public StackdriverClientSidePercentilesHistogram(Clock clock,
* DistributionStatisticConfig distributionStatisticConfig) { super(clock,
* distributionStatisticConfig, true, false, true); }
*
* }
*/

static class StackdriverFixedBoundaryHistogram extends TimeWindowFixedBoundaryHistogram {

StackdriverFixedBoundaryHistogram(Clock clock, DistributionStatisticConfig config) {
super(clock, config, true, false, true);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.distribution.HistogramSupport;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.instrument.step.StepDistributionSummary;
import io.micrometer.core.instrument.step.StepMeterRegistry;
import io.micrometer.core.instrument.step.StepTimer;
import io.micrometer.core.instrument.util.DoubleFormat;
import io.micrometer.core.instrument.util.NamedThreadFactory;
import org.slf4j.Logger;
Expand All @@ -46,7 +44,6 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -289,15 +286,15 @@ Stream<TimeSeries> createLongTaskTimer(Batch batch, LongTaskTimer longTaskTimer)
@Override
protected DistributionSummary newDistributionSummary(Meter.Id id,
DistributionStatisticConfig distributionStatisticConfig, double scale) {
return new StepDistributionSummary(id, clock, distributionStatisticConfig, scale, config.step().toMillis(),
true);
return new StackdriverDistributionSummary(id, clock, distributionStatisticConfig, scale,
config.step().toMillis());
}

@Override
protected Timer newTimer(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector) {
return new StepTimer(id, clock, distributionStatisticConfig, pauseDetector, getBaseTimeUnit(),
this.config.step().toMillis(), true);
return new StackdriverTimer(id, clock, distributionStatisticConfig, pauseDetector, getBaseTimeUnit(),
this.config.step().toMillis());
}

@Override
Expand Down Expand Up @@ -508,18 +505,17 @@ private TimeInterval interval(MetricDescriptor.MetricKind metricKind) {
Distribution distribution(HistogramSnapshot snapshot, boolean timeDomain) {
CountAtBucket[] histogram = snapshot.histogramCounts();

// selected finite buckets (represented as a normal histogram)
AtomicLong truncatedSum = new AtomicLong();
AtomicReference<Double> last = new AtomicReference<>(0.0);
List<Long> bucketCounts = Arrays.stream(histogram).map(countAtBucket -> {
double cumulativeCount = countAtBucket.count();
long bucketCount = (long) (cumulativeCount - last.getAndSet(cumulativeCount));
truncatedSum.addAndGet(bucketCount);
return bucketCount;
}).collect(toCollection(ArrayList::new));

if (!bucketCounts.isEmpty()) {
int endIndex = bucketCounts.size() - 1;
List<Long> bucketCounts = Arrays.stream(histogram)
.map(CountAtBucket::count)
.map(Double::longValue)
.collect(toCollection(ArrayList::new));
long cumulativeCount = Arrays.stream(histogram).mapToLong(c -> (long) c.count()).sum();

// no-op histogram will have no buckets; other histograms should have at least
// the +Inf bucket
if (!bucketCounts.isEmpty() && bucketCounts.size() > 1) {
// the rightmost bucket should be the infinity bucket; do not trim that
int endIndex = bucketCounts.size() - 2;
// trim zero-count buckets on the right side of the domain
if (bucketCounts.get(endIndex) == 0) {
int lastNonZeroIndex = 0;
Expand All @@ -529,18 +525,24 @@ Distribution distribution(HistogramSnapshot snapshot, boolean timeDomain) {
break;
}
}
long infCount = bucketCounts.get(bucketCounts.size() - 1);
bucketCounts = bucketCounts.subList(0, lastNonZeroIndex + 1);
// infinite bucket count of 0 can be omitted
bucketCounts.add(infCount);
}
}

// add the "+infinity" bucket, which does NOT have a corresponding bucket
// boundary
bucketCounts.add(Math.max(0, snapshot.count() - truncatedSum.get()));
// Stackdriver Distribution must have at least one bucket count
if (bucketCounts.isEmpty()) {
bucketCounts.add(0L);
}

List<Double> bucketBoundaries = Arrays.stream(histogram)
.map(countAtBucket -> timeDomain ? countAtBucket.bucket(getBaseTimeUnit()) : countAtBucket.bucket())
.filter(bucket -> bucket != Double.POSITIVE_INFINITY)
.collect(toCollection(ArrayList::new));

// trim bucket boundaries to match bucket count trimming
if (bucketBoundaries.size() != bucketCounts.size() - 1) {
bucketBoundaries = bucketBoundaries.subList(0, bucketCounts.size() - 1);
}
Expand All @@ -551,8 +553,10 @@ Distribution distribution(HistogramSnapshot snapshot, boolean timeDomain) {
}

return Distribution.newBuilder()
// is the mean optional? better to not send as it is for a different time
// window than the histogram
.setMean(timeDomain ? snapshot.mean(getBaseTimeUnit()) : snapshot.mean())
.setCount(snapshot.count())
.setCount(cumulativeCount)
.setBucketOptions(Distribution.BucketOptions.newBuilder()
.setExplicitBuckets(
Distribution.BucketOptions.Explicit.newBuilder().addAllBounds(bucketBoundaries).build())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2025 VMware, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micrometer.stackdriver;

import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.instrument.step.StepTimer;

import java.util.concurrent.TimeUnit;

import static io.micrometer.stackdriver.StackdriverHistogramUtil.stackdriverHistogram;

class StackdriverTimer extends StepTimer {

public StackdriverTimer(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector, TimeUnit baseTimeUnit, long stepDurationMillis) {
super(id, clock, distributionStatisticConfig, pauseDetector, baseTimeUnit, stepDurationMillis,
stackdriverHistogram(clock, distributionStatisticConfig));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
import com.google.api.Distribution;
import io.micrometer.common.lang.Nullable;
import io.micrometer.core.Issue;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.MockClock;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.distribution.CountAtBucket;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -53,10 +56,16 @@ public String get(String key) {
@Test
@Issue("#1325")
void distributionCountBucketsInfinityBucketIsNotNegative() {
DistributionSummary ds = DistributionSummary.builder("ds").serviceLevelObjectives(1, 2).register(meterRegistry);
ds.record(1);
ds.record(1);
ds.record(2);
ds.record(2);
ds.record(2);
StackdriverMeterRegistry.Batch batch = meterRegistry.new Batch();
// count is 4, but sum of bucket counts is 5 due to inconsistent snapshotting
HistogramSnapshot histogramSnapshot = new HistogramSnapshot(4, 14.7, 5, null,
new CountAtBucket[] { new CountAtBucket(1.0, 2), new CountAtBucket(2.0, 5) }, null);
// count is 0 from previous step, but sum of bucket counts is 5
HistogramSnapshot histogramSnapshot = ds.takeSnapshot();
assertThat(histogramSnapshot.count()).isEqualTo(0);
Distribution distribution = batch.distribution(histogramSnapshot, false);
List<Long> bucketCountsList = distribution.getBucketCountsList();
assertThat(bucketCountsList.get(bucketCountsList.size() - 1)).isNotNegative();
Expand All @@ -65,11 +74,99 @@ void distributionCountBucketsInfinityBucketIsNotNegative() {
@Test
@Issue("#2045")
void batchDistributionWhenHistogramSnapshotIsEmpty() {
// no SLOs, percentiles, or percentile histogram configured => no-op histogram
DistributionSummary ds = DistributionSummary.builder("ds").register(meterRegistry);
StackdriverMeterRegistry.Batch batch = meterRegistry.new Batch();
HistogramSnapshot histogramSnapshot = HistogramSnapshot.empty(0, 0.0, 0.0);
HistogramSnapshot histogramSnapshot = ds.takeSnapshot();
assertThat(histogramSnapshot.histogramCounts()).isEmpty();
assertThat(histogramSnapshot.percentileValues()).isEmpty();
Distribution distribution = batch.distribution(histogramSnapshot, false);
assertThat(distribution.getBucketOptions().getExplicitBuckets().getBoundsCount()).isEqualTo(1);
assertThat(distribution.getBucketCountsList()).hasSize(1);
assertThat(distribution.getBucketOptions().getExplicitBuckets().getBoundsList()).containsExactly(0d);
assertThat(distribution.getBucketCountsList()).containsExactly(0L);
}

// gh-4868 is an issue when the step count is less than the histogram count
@Test
void distributionCountMustEqualBucketCountsSum() {
DistributionSummary ds = DistributionSummary.builder("ds").serviceLevelObjectives(1, 2).register(meterRegistry);
ds.record(1);
ds.record(1);
ds.record(2);
ds.record(3);
StackdriverMeterRegistry.Batch batch = meterRegistry.new Batch();
HistogramSnapshot histogramSnapshot = ds.takeSnapshot();
assertThat(histogramSnapshot.count()).isEqualTo(0);
assertThat(histogramSnapshot.histogramCounts()).containsExactly(new CountAtBucket(1d, 2),
new CountAtBucket(2d, 1), new CountAtBucket(Double.POSITIVE_INFINITY, 1));
Distribution distribution = batch.distribution(histogramSnapshot, false);
assertThat(distribution.getCount())
.isEqualTo(distribution.getBucketCountsList().stream().mapToLong(Long::longValue).sum());
}

@Test
void distributionWithTimerShouldHaveInfinityBucket() {
StackdriverMeterRegistry.Batch batch = meterRegistry.new Batch();
Timer timer = Timer.builder("timer")
.serviceLevelObjectives(Duration.ofMillis(1), Duration.ofMillis(2))
.register(meterRegistry);
timer.record(Duration.ofMillis(1));
timer.record(Duration.ofMillis(2));
timer.record(Duration.ofMillis(2));
timer.record(Duration.ofMillis(3));

HistogramSnapshot histogramSnapshot = timer.takeSnapshot();
assertThat(histogramSnapshot.histogramCounts()).contains(new CountAtBucket(Double.POSITIVE_INFINITY, 1));
Distribution distribution = batch.distribution(histogramSnapshot, true);
assertThat(distribution.getCount())
.isEqualTo(distribution.getBucketCountsList().stream().mapToLong(Long::longValue).sum());
assertThat(distribution.getBucketOptions().getExplicitBuckets().getBoundsCount()).isEqualTo(2);
// one more count than boundaries for the infinity bucket
assertThat(distribution.getBucketCountsList()).hasSize(3);
}

@Test
void distributionWithPercentileHistogram() {
StackdriverMeterRegistry.Batch batch = meterRegistry.new Batch();
DistributionSummary ds = DistributionSummary.builder("ds").publishPercentileHistogram().register(meterRegistry);
ds.record(1);
ds.record(2);
ds.record(3);
ds.record(4);
ds.record(23);

Distribution distribution = batch.distribution(ds.takeSnapshot(), false);
assertThat(distribution.getBucketOptions().getExplicitBuckets().getBoundsList()).hasSize(17)
.as("trimmed zero count buckets")
.endsWith(26d);
assertThat(distribution.getBucketCountsList()).hasSize(18).as("Infinity bucket count should be 0").endsWith(0L);
}

@Test
void distributionWithOnlyClientSidePercentilesIsEmpty() {
StackdriverMeterRegistry.Batch batch = meterRegistry.new Batch();
DistributionSummary ds = DistributionSummary.builder("ds")
.publishPercentiles(0.5, 0.99)
.register(meterRegistry);
ds.record(1);

Distribution distribution = batch.distribution(ds.takeSnapshot(), false);
assertThat(distribution.getBucketOptions().getExplicitBuckets().getBoundsList()).containsExactly(0d);
assertThat(distribution.getBucketCountsList()).containsExactly(0L);
}

@Test
void distributionWithClientSidePercentilesAndBuckets() {
StackdriverMeterRegistry.Batch batch = meterRegistry.new Batch();
DistributionSummary ds = DistributionSummary.builder("ds")
.publishPercentiles(0.5, 0.99)
.serviceLevelObjectives(3, 4, 5)
.register(meterRegistry);
ds.record(1);
ds.record(5);

Distribution distribution = batch.distribution(ds.takeSnapshot(), false);
assertThat(distribution.getBucketOptions().getExplicitBuckets().getBoundsList()).containsExactly(3d, 4d, 5d);
assertThat(distribution.getBucketCountsList()).containsExactly(1L, 0L, 1L, 0L);
}

}
Loading

0 comments on commit f77da77

Please sign in to comment.