Skip to content

Commit 3b65338

Browse files
committed
stats: delete unused Counters and Timers when flushing
This commit changes the statsStore to remove Counters and Timers that were not used N times between flush intervals. This should alleviate memory pressure when during cardinality explosions.
1 parent e6b88e6 commit 3b65338

File tree

2 files changed

+138
-1
lines changed

2 files changed

+138
-1
lines changed

stats.go

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,14 @@ import (
1010
tagspkg "github.com/lyft/gostats/internal/tags"
1111
)
1212

13+
// Counter and Timer metrics that were zero after unusedMetricPruneCount are
14+
// deleted from the statStore.
15+
//
16+
// TODO: considering a time interval for this instead of basing it off flush
17+
// count. This guards against aggressively short/long flush intervals, which
18+
// is configurable and thus out of our control.
19+
const unusedMetricPruneCount = 4
20+
1321
// A Store holds statistics.
1422
// There are two options when creating a new store:
1523
//
@@ -238,6 +246,8 @@ func NewDefaultStore() Store {
238246
type counter struct {
239247
currentValue uint64
240248
lastSentValue uint64
249+
// number of times this counter had a value of zero during flush
250+
zeroCount uint32
241251
}
242252

243253
func (c *counter) Add(delta uint64) {
@@ -302,6 +312,12 @@ type timer struct {
302312
base time.Duration
303313
name string
304314
sink Sink
315+
// active is a boolean used to check if the timer was used between flushes
316+
active uint32
317+
// zeroCount is the number of times the timer was not used between
318+
// flushes - if exceeds unusedMetricPruneCount the timer is deleted
319+
// from the Store.
320+
zeroCount uint32
305321
}
306322

307323
func (t *timer) time(dur time.Duration) {
@@ -313,6 +329,7 @@ func (t *timer) AddDuration(dur time.Duration) {
313329
}
314330

315331
func (t *timer) AddValue(value float64) {
332+
atomic.StoreUint32(&t.active, 1)
316333
t.sink.FlushTimer(t.name, value)
317334
}
318335

@@ -369,10 +386,35 @@ func (s *statStore) Flush() {
369386
}
370387
s.mu.RUnlock()
371388

389+
// This is kinda slow and does not write to the sink so run it in a
390+
// separate goroutine.
391+
wg := new(sync.WaitGroup)
392+
wg.Add(1)
393+
go func(timers *sync.Map) {
394+
defer wg.Done()
395+
timers.Range(func(key, v interface{}) bool {
396+
timer := v.(*timer)
397+
switch {
398+
case atomic.SwapUint32(&timer.active, 0) != 0:
399+
atomic.StoreUint32(&timer.zeroCount, 0)
400+
case atomic.AddUint32(&timer.zeroCount, 1) >= unusedMetricPruneCount:
401+
timers.Delete(key)
402+
}
403+
return true
404+
})
405+
}(&s.timers)
406+
372407
s.counters.Range(func(key, v interface{}) bool {
408+
c := v.(*counter)
409+
value := c.latch()
410+
switch {
373411
// do not flush counters that are set to zero
374-
if value := v.(*counter).latch(); value != 0 {
412+
case value != 0:
375413
s.sink.FlushCounter(key.(string), value)
414+
atomic.StoreUint32(&c.zeroCount, 0)
415+
// delete unused counters
416+
case atomic.AddUint32(&c.zeroCount, 1) >= unusedMetricPruneCount:
417+
s.counters.Delete(key)
376418
}
377419
return true
378420
})
@@ -386,6 +428,10 @@ func (s *statStore) Flush() {
386428
if ok {
387429
flushableSink.Flush()
388430
}
431+
432+
// Wait for the goroutine pruning timers to finish. This prevents an
433+
// explosion of goroutines if someone calls Flush in a hot loop.
434+
wg.Wait()
389435
}
390436

391437
func (s *statStore) AddStatGenerator(statGenerator StatGenerator) {

stats_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,45 @@ func TestPerInstanceStats(t *testing.T) {
379379
})
380380
}
381381

382+
func TestStatsStorePrune(t *testing.T) {
383+
s := NewStore(nullSink{}, false).(*statStore)
384+
tick := time.NewTicker(time.Hour) // don't flush automatically
385+
defer tick.Stop()
386+
go s.Start(tick)
387+
388+
const N = 1024
389+
for i := 0; i < N; i++ {
390+
id := strconv.Itoa(i)
391+
s.NewCounter("counter_" + id)
392+
s.NewTimer("timer_" + id)
393+
}
394+
395+
mlen := func(m *sync.Map) int {
396+
n := 0
397+
m.Range(func(_, _ any) bool {
398+
n++
399+
return true
400+
})
401+
return n
402+
}
403+
404+
for i := 0; i < unusedMetricPruneCount; i++ {
405+
if n := mlen(&s.counters); n != N {
406+
t.Errorf("len(s.counters) == %d; want: %d", n, N)
407+
}
408+
if n := mlen(&s.timers); n != N {
409+
t.Errorf("len(s.timers) == %d; want: %d", n, N)
410+
}
411+
s.Flush()
412+
}
413+
if n := mlen(&s.counters); n != 0 {
414+
t.Errorf("len(s.counters) == %d; want: %d", n, 0)
415+
}
416+
if n := mlen(&s.timers); n != 0 {
417+
t.Errorf("len(s.timers) == %d; want: %d", n, 0)
418+
}
419+
}
420+
382421
func BenchmarkStore_MutexContention(b *testing.B) {
383422
s := NewStore(nullSink{}, false)
384423
t := time.NewTicker(500 * time.Microsecond) // we want flush to contend with accessing metrics
@@ -493,3 +532,55 @@ func BenchmarkStoreNewPerInstanceCounter(b *testing.B) {
493532
}
494533
})
495534
}
535+
536+
func BenchmarkStoreNewCounterParallel(b *testing.B) {
537+
s := NewStore(nullSink{}, false)
538+
t := time.NewTicker(time.Hour) // don't flush
539+
defer t.Stop()
540+
go s.Start(t)
541+
names := new([2048]string)
542+
for i := 0; i < len(names); i++ {
543+
names[i] = "counter_" + strconv.Itoa(i)
544+
}
545+
b.ResetTimer()
546+
b.RunParallel(func(pb *testing.PB) {
547+
for i := 0; pb.Next(); i++ {
548+
s.NewCounter(names[i%len(names)])
549+
}
550+
})
551+
}
552+
553+
func BenchmarkStoreFlush(b *testing.B) {
554+
s := NewStore(nullSink{}, false)
555+
t := time.NewTicker(time.Hour) // don't flush automatically
556+
defer t.Stop()
557+
go s.Start(t)
558+
559+
counters := new([2048]*counter)
560+
gauges := new([2048]*gauge)
561+
timers := new([2048]*timer)
562+
563+
for i := 0; i < len(counters); i++ {
564+
id := strconv.Itoa(i)
565+
counters[i] = s.NewCounter("counter_" + id).(*counter)
566+
counters[i].Set(1)
567+
gauges[i] = s.NewGauge("gauge_" + id).(*gauge)
568+
gauges[i].Set(1)
569+
timers[i] = s.NewTimer("timer_" + id).(*timer)
570+
}
571+
b.ResetTimer()
572+
573+
for i, n := 0, 0; i < b.N; i++ {
574+
s.Flush()
575+
if n++; n == unusedMetricPruneCount {
576+
b.StopTimer()
577+
for i := 0; i < len(counters); i++ {
578+
counters[i].currentValue = uint64(i)
579+
counters[i].zeroCount = 0
580+
timers[i].active = 1
581+
}
582+
b.StartTimer()
583+
n = 0
584+
}
585+
}
586+
}

0 commit comments

Comments
 (0)