Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
* [ENHANCEMENT] Upgrade build image and Go version to 1.24.6. #6970 #6976
* [ENHANCEMENT] Implement versioned transactions for writes to DynamoDB ring. #6986
* [ENHANCEMENT] Add source metadata to requests(api vs ruler) #6947
* [ENHANCEMENT] Add new metric `cortex_discarded_series` to track number of series that have a discarded sample. #6995
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
10 changes: 10 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1259,22 +1259,27 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
switch cause := errors.Cause(err); {
case errors.Is(cause, storage.ErrOutOfBounds):
sampleOutOfBoundsCount++
go i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfBounds, userID, copiedLabels.Hash())
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

case errors.Is(cause, storage.ErrOutOfOrderSample):
sampleOutOfOrderCount++
go i.validateMetrics.DiscardedSeriesTracker.Track(sampleOutOfOrder, userID, copiedLabels.Hash())
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp):
newValueForTimestampCount++
go i.validateMetrics.DiscardedSeriesTracker.Track(newValueForTimestamp, userID, copiedLabels.Hash())
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

case errors.Is(cause, storage.ErrTooOldSample):
sampleTooOldCount++
go i.validateMetrics.DiscardedSeriesTracker.Track(sampleTooOld, userID, copiedLabels.Hash())
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })

case errors.Is(cause, errMaxSeriesPerUserLimitExceeded):
perUserSeriesLimitCount++
go i.validateMetrics.DiscardedSeriesTracker.Track(perUserSeriesLimit, userID, copiedLabels.Hash())
updateFirstPartial(func() error {
return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause, copiedLabels))
})
Expand All @@ -1287,12 +1292,17 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte

case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
perMetricSeriesLimitCount++
go i.validateMetrics.DiscardedSeriesTracker.Track(perMetricSeriesLimit, userID, copiedLabels.Hash())
updateFirstPartial(func() error {
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause, copiedLabels))
})

case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
perLabelSetSeriesLimitCount++
go i.validateMetrics.DiscardedSeriesTracker.Track(perLabelsetSeriesLimit, userID, copiedLabels.Hash())
for _, matchedLabelset := range matchedLabelSetLimits {
go i.validateMetrics.DiscardedSeriesPerLabelsetTracker.Track(userID, copiedLabels.Hash(), matchedLabelset.Hash, matchedLabelset.Id)
}
// We only track per labelset discarded samples for throttling by labelset limit.
reasonCounter.increment(matchedLabelSetLimits, perLabelsetSeriesLimit)
updateFirstPartial(func() error {
Expand Down
140 changes: 140 additions & 0 deletions pkg/util/discardedseries/perlabelset_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package discardedseries

import (
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
)

const (
perLabelsetSeriesLimit = "per_labelset_series_limit"
)

type labelsetCounterStruct struct {
*sync.RWMutex
labelsetSeriesMap map[uint64]*seriesCounterStruct
}

type DiscardedSeriesPerLabelsetTracker struct {
*sync.RWMutex
userLabelsetMap map[string]*labelsetCounterStruct
discardedSeriesPerLabelsetGauge *prometheus.GaugeVec
}

func NewDiscardedSeriesPerLabelsetTracker(discardedSeriesPerLabelsetGauge *prometheus.GaugeVec) *DiscardedSeriesPerLabelsetTracker {
tracker := &DiscardedSeriesPerLabelsetTracker{
RWMutex: &sync.RWMutex{},
userLabelsetMap: make(map[string]*labelsetCounterStruct),
discardedSeriesPerLabelsetGauge: discardedSeriesPerLabelsetGauge,
}
return tracker
}

func (t *DiscardedSeriesPerLabelsetTracker) Track(user string, series uint64, matchedLabelsetHash uint64, matchedLabelsetId string) {
t.RLock()
labelsetCounter, ok := t.userLabelsetMap[user]
t.RUnlock()
if !ok {
t.Lock()
labelsetCounter, ok = t.userLabelsetMap[user]
if !ok {
labelsetCounter = &labelsetCounterStruct{
RWMutex: &sync.RWMutex{},
labelsetSeriesMap: make(map[uint64]*seriesCounterStruct),
}
t.userLabelsetMap[user] = labelsetCounter
}
t.Unlock()
}

labelsetCounter.RLock()
seriesCounter, ok := labelsetCounter.labelsetSeriesMap[matchedLabelsetHash]
labelsetCounter.RUnlock()
if !ok {
labelsetCounter.Lock()
seriesCounter, ok = labelsetCounter.labelsetSeriesMap[matchedLabelsetHash]
if !ok {
seriesCounter = &seriesCounterStruct{
RWMutex: &sync.RWMutex{},
seriesCountMap: make(map[uint64]struct{}),
labelsetId: matchedLabelsetId,
}
labelsetCounter.labelsetSeriesMap[matchedLabelsetHash] = seriesCounter
}
labelsetCounter.Unlock()
}

seriesCounter.RLock()
_, ok = seriesCounter.seriesCountMap[series]
seriesCounter.RUnlock()
if !ok {
seriesCounter.Lock()
_, ok = seriesCounter.seriesCountMap[series]
if !ok {
seriesCounter.seriesCountMap[series] = struct{}{}
}
seriesCounter.Unlock()
}
}

func (t *DiscardedSeriesPerLabelsetTracker) UpdateMetrics() {
usersToDelete := make([]string, 0)
labelsetsToDelete := make([]uint64, 0)
t.RLock()
for user, labelsetCounter := range t.userLabelsetMap {
labelsetCounter.RLock()
if len(labelsetCounter.labelsetSeriesMap) == 0 {
usersToDelete = append(usersToDelete, user)
}
for labelsetHash, seriesCounter := range labelsetCounter.labelsetSeriesMap {
seriesCounter.Lock()
count := len(seriesCounter.seriesCountMap)
t.discardedSeriesPerLabelsetGauge.WithLabelValues(perLabelsetSeriesLimit, user, seriesCounter.labelsetId).Set(float64(count))
clear(seriesCounter.seriesCountMap)
if count == 0 {
labelsetsToDelete = append(labelsetsToDelete, labelsetHash)
}
seriesCounter.Unlock()
}
labelsetCounter.RUnlock()
if len(labelsetsToDelete) > 0 {
labelsetCounter.Lock()
for _, labelsetHash := range labelsetsToDelete {
if _, ok := labelsetCounter.labelsetSeriesMap[labelsetHash]; ok {
labelsetId := labelsetCounter.labelsetSeriesMap[labelsetHash].labelsetId
t.discardedSeriesPerLabelsetGauge.DeleteLabelValues(perLabelsetSeriesLimit, user, labelsetId)
delete(labelsetCounter.labelsetSeriesMap, labelsetHash)
}
}
labelsetCounter.Unlock()
}
}
t.RUnlock()
if len(usersToDelete) > 0 {
t.Lock()
for _, user := range usersToDelete {
delete(t.userLabelsetMap, user)
}
t.Unlock()
}
}

func (t *DiscardedSeriesPerLabelsetTracker) StartVendDiscardedSeriesMetricGoroutine() {
go func() {
ticker := time.NewTicker(vendMetricsInterval)
for range ticker.C {
t.UpdateMetrics()
}
}()
}

// only used in testing
func (t *DiscardedSeriesPerLabelsetTracker) getSeriesCount(user string, labelsetLimitHash uint64) int {
if labelsetCounter, ok := t.userLabelsetMap[user]; ok {
if seriesCounter, ok := labelsetCounter.labelsetSeriesMap[labelsetLimitHash]; ok {
return len(seriesCounter.seriesCountMap)
}
}
return 0
}
118 changes: 118 additions & 0 deletions pkg/util/discardedseries/perlabelset_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package discardedseries

import (
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
)

func TestPerLabelsetDiscardedSeriesTracker(t *testing.T) {
gauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "cortex_discarded_series_per_labelset",
Help: "The number of series that include discarded samples for each labelset.",
},
[]string{"reason", "user", "labelset"},
)

tracker := NewDiscardedSeriesPerLabelsetTracker(gauge)
user1 := "user1"
user2 := "user2"
series1 := labels.FromStrings("__name__", "1")
series2 := labels.FromStrings("__name__", "2")
labelset1 := uint64(10)
labelset2 := uint64(20)
labelset3 := uint64(30)
labelsetId1 := "ten"
labelsetId2 := "twenty"
labelsetId3 := "thirty"

tracker.Track(user1, series1.Hash(), labelset1, labelsetId1)
tracker.Track(user1, series1.Hash(), labelset2, labelsetId2)

tracker.Track(user2, series1.Hash(), labelset1, labelsetId1)
tracker.Track(user2, series1.Hash(), labelset1, labelsetId1)
tracker.Track(user2, series1.Hash(), labelset1, labelsetId1)
tracker.Track(user2, series2.Hash(), labelset1, labelsetId1)

require.Equal(t, tracker.getSeriesCount(user1, labelset1), 1)
require.Equal(t, tracker.getSeriesCount(user1, labelset2), 1)
require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0)

comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0)

require.Equal(t, tracker.getSeriesCount(user2, labelset1), 2)
require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0)
require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0)

comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0)

tracker.UpdateMetrics()

tracker.Track(user1, series1.Hash(), labelset1, labelsetId1)
tracker.Track(user1, series1.Hash(), labelset1, labelsetId1)

require.Equal(t, tracker.getSeriesCount(user1, labelset1), 1)
require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0)
require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0)

comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 1)
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 1)
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0)

require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0)
require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0)
require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0)

comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 2)
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0)

tracker.UpdateMetrics()

require.Equal(t, tracker.getSeriesCount(user1, labelset1), 0)
require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0)
require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0)

comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 1)
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0)

require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0)
require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0)
require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0)

comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0)

tracker.UpdateMetrics()

require.Equal(t, tracker.getSeriesCount(user1, labelset1), 0)
require.Equal(t, tracker.getSeriesCount(user1, labelset2), 0)
require.Equal(t, tracker.getSeriesCount(user1, labelset3), 0)

comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId1, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId2, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user1, labelsetId3, 0)

require.Equal(t, tracker.getSeriesCount(user2, labelset1), 0)
require.Equal(t, tracker.getSeriesCount(user2, labelset2), 0)
require.Equal(t, tracker.getSeriesCount(user2, labelset3), 0)

comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId1, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId2, 0)
comparePerLabelsetSeriesVendedCount(t, gauge, user2, labelsetId3, 0)
}

func comparePerLabelsetSeriesVendedCount(t *testing.T, gaugeVec *prometheus.GaugeVec, user string, labelsetLimitId string, val int) {
gauge, _ := gaugeVec.GetMetricWithLabelValues("per_labelset_series_limit", user, labelsetLimitId)
require.Equal(t, testutil.ToFloat64(gauge), float64(val))
}
Loading
Loading