Skip to content

Commit

Permalink
protect writes while flushing
Browse files Browse the repository at this point in the history
  • Loading branch information
maciuszek committed Jan 16, 2025
1 parent 858a3fd commit 4e9611d
Showing 1 changed file with 26 additions and 8 deletions.
34 changes: 26 additions & 8 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ type timer interface {
GetValue(int) float64
ValueCount() int
SampleRate() float64
Lock()
Unlock()
Reset()
}

Expand Down Expand Up @@ -362,15 +364,21 @@ func (t *standardTimer) SampleRate() float64 {
return 1.0 // metrics which are not sampled have an implicit sample rate 1.0
}

// no support or need for concurrency
func (t *standardTimer) Lock() {}

// no support or need for concurrency
func (t *standardTimer) Unlock() {}

// nothing to persisted in memroy for this timer
func (t *standardTimer) Reset() {}

type reservoirTimer struct {
mu sync.Mutex
base time.Duration
name string
ringSize uint64 // just used so that we don't have to re-evaluate capacity of values
ringMask uint64
ringSize uint64 // this value shouldn't change, just used so that we don't have to re-evaluate capacity of values
ringMask uint64 // this value shouldn't change
values []float64
count uint64
}
Expand All @@ -387,7 +395,6 @@ func (t *reservoirTimer) AddValue(value float64) {
t.mu.Lock()
defer t.mu.Unlock()

// t.ringMask isn't ever changed so the access to should fine
t.values[atomic.LoadUint64(&t.count)&t.ringMask] = value
atomic.AddUint64(&t.count, 1)
}
Expand All @@ -397,15 +404,12 @@ func (t *reservoirTimer) AllocateSpan() Timespan {
}

func (t *reservoirTimer) GetValue(index int) float64 {
t.mu.Lock()
defer t.mu.Unlock()

return t.values[index]
}

func (t *reservoirTimer) ValueCount() int {
count := atomic.LoadUint64(&t.count)
ringSize := atomic.LoadUint64(&t.ringSize)
ringSize := t.ringSize

if count > ringSize {
return int(ringSize)
Expand All @@ -415,14 +419,22 @@ func (t *reservoirTimer) ValueCount() int {

func (t *reservoirTimer) SampleRate() float64 {
count := atomic.LoadUint64(&t.count)
ringSize := atomic.LoadUint64(&t.ringSize)
ringSize := t.ringSize

if count <= ringSize {
return 1.0
}
return float64(ringSize) / float64(count)
}

func (t *reservoirTimer) Lock() {
t.mu.Lock()
}

func (t *reservoirTimer) Unlock() {
t.mu.Unlock()
}

func (t *reservoirTimer) Reset() {
atomic.StoreUint64(&t.count, 0)
}
Expand Down Expand Up @@ -504,6 +516,11 @@ func (s *statStore) Flush() {

s.timers.Range(func(key, v interface{}) bool {
if timer, ok := v.(*reservoirTimer); ok {
// lock while flushing to:
// 1. provide correct sample rate
// 2. allow for exit despite continuous writes
// 3. reduce metric loss from writes after flush and before reset
timer.Lock()
sampleRate := timer.SampleRate()

// since the map memory is reused only process what we accumulated in the current processing itteration
Expand All @@ -512,6 +529,7 @@ func (s *statStore) Flush() {
}

timer.Reset()
timer.Unlock()
}

return true
Expand Down

0 comments on commit 4e9611d

Please sign in to comment.