diff --git a/stats.go b/stats.go index 03457c8..071c887 100644 --- a/stats.go +++ b/stats.go @@ -325,6 +325,8 @@ type timer interface { GetValue(int) float64 ValueCount() int SampleRate() float64 + Lock() + Unlock() Reset() } @@ -362,6 +364,12 @@ func (t *standardTimer) SampleRate() float64 { return 1.0 // metrics which are not sampled have an implicit sample rate 1.0 } +// no support or need for concurrency +func (t *standardTimer) Lock() {} + +// no support or need for concurrency +func (t *standardTimer) Unlock() {} + // nothing to persisted in memroy for this timer func (t *standardTimer) Reset() {} @@ -369,8 +377,8 @@ type reservoirTimer struct { mu sync.Mutex base time.Duration name string - ringSize uint64 // just used so that we don't have to re-evaluate capacity of values - ringMask uint64 + ringSize uint64 // this value shouldn't change, just used so that we don't have to re-evaluate capacity of values + ringMask uint64 // this value shouldn't change values []float64 count uint64 } @@ -387,7 +395,6 @@ func (t *reservoirTimer) AddValue(value float64) { t.mu.Lock() defer t.mu.Unlock() - // t.ringMask isn't ever changed so the access to should fine t.values[atomic.LoadUint64(&t.count)&t.ringMask] = value atomic.AddUint64(&t.count, 1) } @@ -397,15 +404,12 @@ func (t *reservoirTimer) AllocateSpan() Timespan { } func (t *reservoirTimer) GetValue(index int) float64 { - t.mu.Lock() - defer t.mu.Unlock() - return t.values[index] } func (t *reservoirTimer) ValueCount() int { count := atomic.LoadUint64(&t.count) - ringSize := atomic.LoadUint64(&t.ringSize) + ringSize := t.ringSize if count > ringSize { return int(ringSize) @@ -415,7 +419,7 @@ func (t *reservoirTimer) ValueCount() int { func (t *reservoirTimer) SampleRate() float64 { count := atomic.LoadUint64(&t.count) - ringSize := atomic.LoadUint64(&t.ringSize) + ringSize := t.ringSize if count <= ringSize { return 1.0 @@ -423,6 +427,14 @@ func (t *reservoirTimer) SampleRate() float64 { return float64(ringSize) / float64(count) } +func (t *reservoirTimer) Lock() { + t.mu.Lock() +} + +func (t *reservoirTimer) Unlock() { + t.mu.Unlock() +} + func (t *reservoirTimer) Reset() { atomic.StoreUint64(&t.count, 0) } @@ -504,6 +516,11 @@ func (s *statStore) Flush() { s.timers.Range(func(key, v interface{}) bool { if timer, ok := v.(*reservoirTimer); ok { + // lock while flushing to: + // 1. provide correct sample rate + // 2. allow for exit despite continuous writes + // 3. reduce metric loss from writes after flush and before reset + timer.Lock() sampleRate := timer.SampleRate() // since the map memory is reused only process what we accumulated in the current processing itteration @@ -512,6 +529,7 @@ func (s *statStore) Flush() { } timer.Reset() + timer.Unlock() } return true