Skip to content

Commit 6fca20c

Browse files
Add concurrency to exponential histogram
- Rescaling is not frequent behaviour (for cumulative scale should normalize after initial few recording, for delta it could increase/decrease but would be mostly stable). Instead of opting for a full synchronization on writes (which would have less performance), the choice of ReadWriteLock is chosen.
1 parent 9018b50 commit 6fca20c

File tree

5 files changed

+105
-40
lines changed

5 files changed

+105
-40
lines changed

implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/Base2ExponentialHistogram.java

+72-14
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.atomic.LongAdder;
25+
import java.util.concurrent.locks.ReentrantReadWriteLock;
2526
import java.util.stream.Collectors;
2627

2728
import io.micrometer.common.lang.Nullable;
@@ -49,6 +50,21 @@
4950
*/
5051
public abstract class Base2ExponentialHistogram implements Histogram {
5152

53+
/*
54+
* This is an alternative to full-blown synchronization on the Histogram which would
55+
* severely affect the performance under high concurrent cases.
56+
*
57+
* Read lock guards adding data into histogram i,e multiple values can be recorded at
58+
* the same time for a given scale. If scale needs to be changed, write-lock need to
59+
* be acquired and all the operations are blocked until new scale is determined and
60+
* updated with.
61+
*/
62+
private final ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
63+
64+
private final ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
65+
66+
private final ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
67+
5268
private final int maxScale;
5369

5470
private final int maxBucketsCount;
@@ -131,13 +147,33 @@ public HistogramSnapshot takeSnapshot(final long count, final double total, fina
131147
}
132148

133149
/**
134-
* Returns the snapshot of current recorded values.
150+
* Returns the snapshot of current recorded values. This method is thread-safe and
151+
* block concurrent recordings to the underlying histogram.
135152
*/
136-
ExponentialHistogramSnapShot getCurrentValuesSnapshot() {
137-
return (circularCountHolder.isEmpty() && zeroCount.longValue() == 0)
138-
? DefaultExponentialHistogramSnapShot.getEmptySnapshotForScale(scale)
139-
: new DefaultExponentialHistogramSnapShot(scale, zeroCount.longValue(), zeroThreshold,
140-
new ExponentialBuckets(getOffset(), getBucketCounts()), EMPTY_EXPONENTIAL_BUCKET);
153+
ExponentialHistogramSnapShot getCurrentValuesSnapshot(final boolean shouldResetHistogram) {
154+
writeLock.lock();
155+
try {
156+
ExponentialHistogramSnapShot exponentialHistogramSnapShot = getExponentialHistogramSnapShot();
157+
if (shouldResetHistogram)
158+
reset();
159+
return exponentialHistogramSnapShot;
160+
}
161+
finally {
162+
writeLock.unlock();
163+
}
164+
}
165+
166+
private ExponentialHistogramSnapShot getExponentialHistogramSnapShot() {
167+
ExponentialHistogramSnapShot exponentialHistogramSnapShot;
168+
if (circularCountHolder.isEmpty() && zeroCount.longValue() == 0) {
169+
// Will only be possible for a delta variant
170+
exponentialHistogramSnapShot = DefaultExponentialHistogramSnapShot.getEmptySnapshotForScale(scale);
171+
}
172+
else {
173+
exponentialHistogramSnapShot = new DefaultExponentialHistogramSnapShot(scale, zeroCount.longValue(),
174+
zeroThreshold, new ExponentialBuckets(getOffset(), getBucketCounts()), EMPTY_EXPONENTIAL_BUCKET);
175+
}
176+
return exponentialHistogramSnapShot;
141177
}
142178

143179
/**
@@ -168,24 +204,46 @@ public void recordDouble(double value) {
168204
return;
169205
}
170206

171-
int index = base2IndexProvider.getIndexForValue(value);
172-
if (!circularCountHolder.increment(index, 1)) {
173-
synchronized (this) {
174-
int downScaleFactor = getDownScaleFactor(index);
207+
recordInternal(value);
208+
}
209+
210+
private void recordInternal(double value) {
211+
boolean hasRecorded;
212+
readLock.lock();
213+
try {
214+
hasRecorded = incrementValue(value);
215+
}
216+
finally {
217+
readLock.unlock();
218+
}
219+
220+
if (!hasRecorded) {
221+
// If recording fails, then we MIGHT have to re-scale. Try re-scaling.
222+
writeLock.lock();
223+
try {
224+
final int downScaleFactor = getDownScaleFactor(base2IndexProvider.getIndexForValue(value));
175225
downScale(downScaleFactor);
176-
index = base2IndexProvider.getIndexForValue(value);
177-
circularCountHolder.increment(index, 1);
226+
// Record the value within the write lock to guarantee write.
227+
incrementValue(value);
228+
}
229+
finally {
230+
writeLock.unlock();
178231
}
179232
}
180233
}
181234

235+
private boolean incrementValue(final double value) {
236+
return circularCountHolder.increment(base2IndexProvider.getIndexForValue(value), 1);
237+
}
238+
182239
/**
183240
* Reduces the scale of the histogram by downScaleFactor. The buckets are merged to
184241
* align with the exponential scale.
185242
* @param downScaleFactor - the factor to downscale this histogram.
186243
*/
187244
private void downScale(int downScaleFactor) {
188245
if (downScaleFactor == 0) {
246+
// Should never happen.
189247
return;
190248
}
191249

@@ -286,12 +344,12 @@ private List<Long> getBucketCounts() {
286344
* Reset the current values and possibly increase the scale based on current recorded
287345
* values;
288346
*/
289-
synchronized void reset() {
347+
// VisibleForTesting
348+
void reset() {
290349
int upscaleFactor = getUpscaleFactor();
291350
if (upscaleFactor > 0) {
292351
this.updateScale(this.scale + upscaleFactor);
293352
}
294-
295353
this.circularCountHolder.reset();
296354
this.zeroCount.reset();
297355
}

implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/CircularCountHolder.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,19 @@ else if (index < startIndex) {
8181
startIndex = index;
8282
}
8383

84-
counts.addAndGet(getRelativeIndex(index), incrementBy);
85-
return true;
84+
final int relativeIndex = getRelativeIndex(index);
85+
if (relativeIndex >= 0 && relativeIndex < length) {
86+
counts.addAndGet(relativeIndex, incrementBy);
87+
return true;
88+
}
89+
/*
90+
* This should not happen if the writes are fully exclusive but that would mean a
91+
* poor performance. This case only possible (rarely) during initial writes on th
92+
* histogram where there might be concurrent threads modifying the index and if it
93+
* not fully synchronized externally. In this case, we would notify the caller
94+
* that write has failed and let the caller take a call on what to do.
95+
*/
96+
return false;
8697
}
8798

8899
private int getRelativeIndex(int index) {

implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/CumulativeBase2ExponentialHistogram.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ public ExponentialHistogramSnapShot getLatestExponentialHistogramSnapshot() {
5959
}
6060

6161
@Override
62-
synchronized void takeExponentialHistogramSnapShot() {
63-
this.exponentialHistogramSnapShot = getCurrentValuesSnapshot();
62+
void takeExponentialHistogramSnapShot() {
63+
this.exponentialHistogramSnapShot = getCurrentValuesSnapshot(false);
6464
}
6565

6666
}

implementations/micrometer-registry-otlp/src/main/java/io/micrometer/registry/otlp/internal/DeltaBase2ExponentialHistogram.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public ExponentialHistogramSnapShot getLatestExponentialHistogramSnapshot() {
6666
}
6767

6868
@Override
69-
synchronized void takeExponentialHistogramSnapShot() {
69+
void takeExponentialHistogramSnapShot() {
7070
stepExponentialHistogramSnapShot.poll();
7171
}
7272

@@ -82,12 +82,8 @@ public StepExponentialHistogramSnapShot(final Clock clock, final long stepMillis
8282
}
8383

8484
@Override
85-
protected synchronized Supplier<ExponentialHistogramSnapShot> valueSupplier() {
86-
return () -> {
87-
ExponentialHistogramSnapShot latestSnapShot = getCurrentValuesSnapshot();
88-
reset();
89-
return latestSnapShot;
90-
};
85+
protected Supplier<ExponentialHistogramSnapShot> valueSupplier() {
86+
return () -> getCurrentValuesSnapshot(true);
9187
}
9288

9389
@Override

implementations/micrometer-registry-otlp/src/test/java/io/micrometer/registry/otlp/internal/Base2ExponentialHistogramTest.java

+15-15
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ void testRecordDouble() {
5151
// 1 Always belongs to index 0.
5252
base2ExponentialHistogram.recordDouble(1.000000000001);
5353
assertThat(base2ExponentialHistogram.getScale()).isEqualTo(MAX_SCALE);
54-
assertThat(base2ExponentialHistogram.getCurrentValuesSnapshot().zeroCount()).isZero();
55-
assertThat(getAllBucketsCountSum(base2ExponentialHistogram.getCurrentValuesSnapshot())).isEqualTo(1);
54+
assertThat(base2ExponentialHistogram.getCurrentValuesSnapshot(false).zeroCount()).isZero();
55+
assertThat(getAllBucketsCountSum(base2ExponentialHistogram.getCurrentValuesSnapshot(false))).isEqualTo(1);
5656
}
5757

5858
@Test
@@ -66,7 +66,7 @@ void testRecordTimeBased() {
6666
// calling
6767
// recordDouble(2).
6868

69-
ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
69+
ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
7070
assertThat(currentSnapshot.zeroCount()).isZero();
7171
assertThat(currentSnapshot.scale()).isLessThan(MAX_SCALE);
7272
assertThat(getAllBucketsCountSum(currentSnapshot)).isEqualTo(2);
@@ -84,13 +84,13 @@ void testRecordTimeBasedInSeconds() {
8484
// This should be same as calling recordDouble(0.05).
8585
base2ExponentialHistogram.recordLong(Duration.ofMillis(50).toNanos());
8686

87-
ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
87+
ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
8888
assertThat(currentSnapshot.zeroCount()).isZero();
8989
assertThat(currentSnapshot.scale()).isLessThan(MAX_SCALE);
9090
assertThat(getAllBucketsCountSum(currentSnapshot)).isEqualTo(2);
9191

9292
base2ExponentialHistogram.recordLong(Duration.ofMillis(90).toNanos());
93-
currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
93+
currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
9494
assertThat(currentSnapshot.scale()).isEqualTo(1);
9595
assertThat(getAllBucketsCountSum(currentSnapshot)).isEqualTo(3);
9696
}
@@ -101,7 +101,7 @@ void testZeroThreshHold() {
101101
base2ExponentialHistogram.recordDouble(0.0);
102102
base2ExponentialHistogram.recordDouble(0.5);
103103

104-
ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
104+
ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
105105
assertThat(currentSnapshot.zeroThreshold()).isLessThan(1).isGreaterThan(0);
106106
assertThat(currentSnapshot.zeroCount()).isEqualTo(2);
107107
assertThat(currentSnapshot.scale()).isEqualTo(MAX_SCALE);
@@ -113,7 +113,7 @@ void testZeroThreshHold() {
113113
base2ExponentialHistogramWithZeroAsMin.recordDouble(Math.nextUp(0.0));
114114

115115
ExponentialHistogramSnapShot snapshotWithZeroAsMin = base2ExponentialHistogramWithZeroAsMin
116-
.getCurrentValuesSnapshot();
116+
.getCurrentValuesSnapshot(false);
117117
assertThat(snapshotWithZeroAsMin.zeroThreshold()).isEqualTo(0.0);
118118
assertThat(snapshotWithZeroAsMin.zeroCount()).isEqualTo(1);
119119
assertThat(snapshotWithZeroAsMin.scale()).isEqualTo(MAX_SCALE);
@@ -124,7 +124,7 @@ void testZeroThreshHold() {
124124
void testDownScale() {
125125
base2ExponentialHistogram.recordDouble(1.0001);
126126

127-
ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
127+
ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
128128
assertThat(currentSnapshot.zeroCount()).isZero();
129129
assertThat(currentSnapshot.scale()).isEqualTo(MAX_SCALE);
130130
assertThat(getAllBucketsCountSum(currentSnapshot)).isEqualTo(1);
@@ -182,11 +182,11 @@ void testUpscale() {
182182

183183
@Test
184184
void testValuesAtIndices() {
185-
ExponentialHistogramSnapShot currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
185+
ExponentialHistogramSnapShot currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
186186
assertThat(currentValueSnapshot.positive().bucketCounts()).isEmpty();
187187

188188
base2ExponentialHistogram.recordDouble(1.0001);
189-
currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
189+
currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
190190
assertThat(currentValueSnapshot.positive().offset()).isZero();
191191
assertThat(currentValueSnapshot.positive().bucketCounts().get(0)).isEqualTo(1);
192192
assertThat(currentValueSnapshot.positive().bucketCounts()).filteredOn(value -> value == 0).isEmpty();
@@ -195,7 +195,7 @@ void testValuesAtIndices() {
195195

196196
base2ExponentialHistogram.recordDouble(1.0076);
197197
base2ExponentialHistogram.recordDouble(1.008);
198-
currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
198+
currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
199199
assertThat(currentValueSnapshot.positive().offset()).isZero();
200200
assertThat(base2ExponentialHistogram.getScale()).isEqualTo(MAX_SCALE);
201201
assertThat(currentValueSnapshot.positive().bucketCounts().get(0)).isEqualTo(1);
@@ -206,7 +206,7 @@ void testValuesAtIndices() {
206206
// We will record a value that will downscale by 1 and this should merge 2
207207
// consecutive buckets into one.
208208
base2ExponentialHistogram.recordDouble(1.012);
209-
currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
209+
currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
210210
assertThat(currentValueSnapshot.positive().offset()).isZero();
211211
assertThat(base2ExponentialHistogram.getScale()).isEqualTo(MAX_SCALE - 1);
212212
assertThat(currentValueSnapshot.positive().bucketCounts().get(0)).isEqualTo(2);
@@ -216,7 +216,7 @@ void testValuesAtIndices() {
216216

217217
// The base will reduced by a factor of more than one,
218218
base2ExponentialHistogram.recordDouble(4);
219-
currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
219+
currentValueSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
220220
assertThat(currentValueSnapshot.positive().offset()).isZero();
221221
assertThat(base2ExponentialHistogram.getScale()).isEqualTo(3);
222222
assertThat(currentValueSnapshot.positive().bucketCounts().get(0)).isEqualTo(5);
@@ -242,15 +242,15 @@ void reset() {
242242
base2ExponentialHistogram.recordDouble(1);
243243
base2ExponentialHistogram.recordDouble(2);
244244

245-
ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
245+
ExponentialHistogramSnapShot currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
246246
final int intialScale = currentSnapshot.scale();
247247
assertThat(currentSnapshot.zeroCount()).isEqualTo(1);
248248
assertThat(currentSnapshot.scale()).isLessThan(MAX_SCALE);
249249
assertThat(currentSnapshot.positive().offset()).isNotZero();
250250
assertThat(getAllBucketsCountSum(currentSnapshot)).isEqualTo(2);
251251

252252
base2ExponentialHistogram.reset();
253-
currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot();
253+
currentSnapshot = base2ExponentialHistogram.getCurrentValuesSnapshot(false);
254254
assertThat(currentSnapshot.zeroCount()).isZero();
255255
assertThat(currentSnapshot.scale()).isEqualTo(intialScale);
256256
assertThat(currentSnapshot.positive().offset()).isZero();

0 commit comments

Comments
 (0)