Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased
* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160
* [CHANGE] Cache: Setting `-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl` to 0 will disable the bucket-index cache. #7446
* [CHANGE] Ingester: Add experimental `-ingester.local-limit-cache-enabled` flag to prevent false per-user series limit throttling during ingester scale-up. When enabled, the local limit (derived from `global_limit / num_ingesters * replication_factor`) is cached per tenant and prevented from shrinking when the global limit has not changed. This may cause temporary over-commitment of the global series limit until head compaction (~2h). #7491
* [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Supports Grafana Explore, Perses, and other UIs. #7302
* [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371
* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385
Expand Down
7 changes: 7 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3856,6 +3856,13 @@ instance_limits:
# CLI flag: -ingester.enable-regex-matcher-limits
[enable_regex_matcher_limits: <boolean> | default = false]

# [Experimental] When enabled, the per-ingester local series limit is cached and
# prevented from shrinking during ring topology changes if the global limit has
# not changed. This prevents false throttling during ingester scale-up at the
# cost of potential temporary over-commitment until head compaction.
# CLI flag: -ingester.local-limit-cache-enabled
[local_limit_cache_enabled: <boolean> | default = false]

query_protection:
rejection:
threshold:
Expand Down
3 changes: 3 additions & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,6 @@ Currently experimental features are:
- `-validation.max-label-cardinality-for-unoptimized-regex` (int) - maximum label cardinality
- `-validation.max-total-label-value-length-for-unoptimized-regex` (int) - maximum total length of all label values in bytes
- HATracker: `-distributor.ha-tracker.enable-startup-sync` (bool) - If enabled, fetches all tracked keys on startup to populate the local cache.
- Ingester: Local Limit Cache
- Enable local limit caching during ring topology changes via `-ingester.local-limit-cache-enabled=true`
- When enabled, the per-ingester local series limit is prevented from shrinking during scale-up if the global limit has not changed. This avoids false throttling but may cause temporary over-commitment until head compaction (~2h).
13 changes: 13 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ type Config struct {
// for unoptimized regex matchers, and enforce per-tenant limits if configured.
EnableRegexMatcherLimits bool `yaml:"enable_regex_matcher_limits"`

// LocalLimitCacheEnabled prevents the per-ingester local series limit from shrinking
// during ring topology changes (e.g., ingester scale-up). When enabled, the limiter
// caches the previous local limit per tenant and prevents it from decreasing when the
// global limit has not changed. The cache is reset per-tenant after head compaction.
LocalLimitCacheEnabled bool `yaml:"local_limit_cache_enabled"`

QueryProtection configs.QueryProtection `yaml:"query_protection"`
}

Expand Down Expand Up @@ -208,6 +214,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.SkipMetadataLimits, "ingester.skip-metadata-limits", true, "If enabled, the metadata API returns all metadata regardless of the limits.")
f.BoolVar(&cfg.EnableMatcherOptimization, "ingester.enable-matcher-optimization", false, "Enable optimization of label matchers when query chunks. When enabled, matchers with low selectivity such as =~.+ are applied lazily during series scanning instead of being used for postings matching.")
f.BoolVar(&cfg.EnableRegexMatcherLimits, "ingester.enable-regex-matcher-limits", false, "Enable regex matcher limits and metrics collection for unoptimized regex queries. When enabled, the ingester will track pattern length, label cardinality, and total value length for unoptimized regex matchers.")
f.BoolVar(&cfg.LocalLimitCacheEnabled, "ingester.local-limit-cache-enabled", false, "[Experimental] When enabled, the per-ingester local series limit is cached and prevented from shrinking during ring topology changes if the global limit has not changed. This prevents false throttling during ingester scale-up at the cost of potential temporary over-commitment until head compaction.")
cfg.DefaultLimits.RegisterFlagsWithPrefix(f, "ingester.")
cfg.QueryProtection.RegisterFlagsWithPrefix(f, "ingester.")
}
Expand Down Expand Up @@ -851,6 +858,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
cfg.LifecyclerConfig.RingConfig.ReplicationFactor,
cfg.LifecyclerConfig.RingConfig.ZoneAwarenessEnabled,
cfg.AdminLimitMessage,
cfg.LocalLimitCacheEnabled,
)

i.TSDBState.shipperIngesterID = i.lifecycler.ID
Expand Down Expand Up @@ -904,6 +912,7 @@ func NewForFlusher(cfg Config, limits *validation.Overrides, registerer promethe
cfg.LifecyclerConfig.RingConfig.ReplicationFactor,
cfg.LifecyclerConfig.RingConfig.ZoneAwarenessEnabled,
cfg.AdminLimitMessage,
cfg.LocalLimitCacheEnabled,
)
i.metrics = newIngesterMetrics(registerer,
false,
Expand Down Expand Up @@ -3331,6 +3340,10 @@ func (i *Ingester) compactBlocks(ctx context.Context, force bool, allowed *users
level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "TSDB blocks compaction for user has failed", "user", userID, "err", err, "compactReason", reason)
} else {
level.Debug(logutil.WithContext(ctx, i.logger)).Log("msg", "TSDB blocks compaction completed successfully", "user", userID, "compactReason", reason)
// Reset the local limit cache after successful compaction.
// Idle series (including those resharded to other ingesters) are now
// flushed from the head, so the series count reflects true ownership.
i.limiter.ResetLocalLimitCache(userID)
}

return nil
Expand Down
61 changes: 60 additions & 1 deletion pkg/ingester/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ingester
import (
"fmt"
"math"
"sync"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -33,6 +34,15 @@ type RingCount interface {
ZonesCount() int
}

// localLimitEntry stores a previously computed local limit along with the
// global limit that was used to derive it. This is used to prevent the local
// limit from shrinking during ingester scale-up when the global limit has not
// changed.
type localLimitEntry struct {
localLimit int
globalLimit int
}

// Limiter implements primitives to get the maximum number of series
// an ingester can handle for a specific tenant
type Limiter struct {
Expand All @@ -43,6 +53,21 @@ type Limiter struct {
shardByAllLabels bool
zoneAwarenessEnabled bool
AdminLimitMessage string

// localLimitCacheEnabled gates the local limit caching behavior that prevents
// false throttling during ingester scale-up. When enabled, the limiter caches
// the previous local limit per tenant and prevents it from shrinking when the
// global limit has not changed. This avoids rejecting writes for tenants who
// are within their global quota but whose local limit dropped due to ring growth.
//
// Note: enabling this may cause temporary over-commitment of the global series
// limit until head compaction redistributes series to new ingesters (~2h).
localLimitCacheEnabled bool

// prevLocalLimits caches the previous local limit per user to prevent
// false throttling during ingester ring topology changes.
prevLocalLimitsMu sync.RWMutex
prevLocalLimits map[string]localLimitEntry
}

// NewLimiter makes a new in-memory series limiter
Expand All @@ -54,6 +79,7 @@ func NewLimiter(
replicationFactor int,
zoneAwarenessEnabled bool,
AdminLimitMessage string,
localLimitCacheEnabled bool,
) *Limiter {
return &Limiter{
limits: limits,
Expand All @@ -63,6 +89,8 @@ func NewLimiter(
shardByAllLabels: shardByAllLabels,
zoneAwarenessEnabled: zoneAwarenessEnabled,
AdminLimitMessage: AdminLimitMessage,
localLimitCacheEnabled: localLimitCacheEnabled,
prevLocalLimits: make(map[string]localLimitEntry),
}
}

Expand Down Expand Up @@ -333,7 +361,38 @@ func (l *Limiter) convertGlobalToLocalLimit(userID string, globalLimit int) int
numIngesters = min(numIngesters, util.ShuffleShardExpectedInstances(shardSize, l.getNumZones()))
}

return int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor))
newLimit := int((float64(globalLimit) / float64(numIngesters)) * float64(l.replicationFactor))

// Prevent the local limit from shrinking when the global limit has not changed.
// During ingester scale-up, numIngesters increases before series redistribute
// (which happens at head compaction), causing the local limit to drop below the
// series count an ingester already holds. This results in false throttling for
// tenants who are within their global limit.
//
// We only cache when the global limit is unchanged — if the global limit changes
// (increase or decrease), we always recalculate to respect the new configuration.
if l.localLimitCacheEnabled {
l.prevLocalLimitsMu.RLock()
prev, ok := l.prevLocalLimits[userID]
l.prevLocalLimitsMu.RUnlock()
if ok && newLimit < prev.localLimit && globalLimit == prev.globalLimit {
return prev.localLimit
}
l.prevLocalLimitsMu.Lock()
l.prevLocalLimits[userID] = localLimitEntry{localLimit: newLimit, globalLimit: globalLimit}
l.prevLocalLimitsMu.Unlock()
}
return newLimit
}

// ResetLocalLimitCache clears the cached local limit for a specific user.
// This should be called after the user's TSDB head compaction when series have
// been redistributed and the ingester's actual series count for this user
// reflects its true post-resharding ownership.
func (l *Limiter) ResetLocalLimitCache(userID string) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If tenants are deleted, their entries stay forever. right?
I think we need to call this from closeAndDeleteUserTSDBIfIdle

l.prevLocalLimitsMu.Lock()
delete(l.prevLocalLimits, userID)
l.prevLocalLimitsMu.Unlock()
}

func (l *Limiter) getShardSize(userID string) int {
Expand Down
136 changes: 127 additions & 9 deletions pkg/ingester/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,12 @@ func runLimiterMaxFunctionTest(
overrides := validation.NewOverrides(limits, nil)

// Assert on default sharding strategy.
limiter := NewLimiter(overrides, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, testData.ringZoneAwarenessEnabled, "")
limiter := NewLimiter(overrides, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, testData.ringZoneAwarenessEnabled, "", false)
actual := runMaxFn(limiter)
assert.Equal(t, testData.expectedDefaultSharding, actual)

// Assert on shuffle sharding strategy.
limiter = NewLimiter(overrides, ring, util.ShardingStrategyShuffle, testData.shardByAllLabels, testData.ringReplicationFactor, testData.ringZoneAwarenessEnabled, "")
limiter = NewLimiter(overrides, ring, util.ShardingStrategyShuffle, testData.shardByAllLabels, testData.ringReplicationFactor, testData.ringZoneAwarenessEnabled, "", false)
actual = runMaxFn(limiter)
assert.Equal(t, testData.expectedShuffleSharding, actual)
})
Expand Down Expand Up @@ -300,7 +300,7 @@ func TestLimiter_AssertMaxSeriesPerMetric(t *testing.T) {
MaxGlobalSeriesPerMetric: testData.maxGlobalSeriesPerMetric,
}, nil)

limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "")
limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "", false)
actual := limiter.AssertMaxSeriesPerMetric("test", testData.series)

assert.Equal(t, testData.expected, actual)
Expand Down Expand Up @@ -360,7 +360,7 @@ func TestLimiter_AssertMaxMetadataPerMetric(t *testing.T) {
MaxGlobalMetadataPerMetric: testData.maxGlobalMetadataPerMetric,
}, nil)

limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "")
limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "", false)
actual := limiter.AssertMaxMetadataPerMetric("test", testData.metadata)

assert.Equal(t, testData.expected, actual)
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestLimiter_AssertMaxSeriesPerUser(t *testing.T) {
MaxGlobalSeriesPerUser: testData.maxGlobalSeriesPerUser,
}, nil)

limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "")
limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "", false)
actual := limiter.AssertMaxSeriesPerUser("test", testData.series)

assert.Equal(t, testData.expected, actual)
Expand Down Expand Up @@ -482,7 +482,7 @@ func TestLimiter_AssertMaxNativeHistogramsSeriesPerUser(t *testing.T) {
MaxGlobalNativeHistogramSeriesPerUser: testData.maxGlobalNativeHistogramsSeriesPerUser,
}, nil)

limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "")
limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "", false)
actual := limiter.AssertMaxNativeHistogramSeriesPerUser("test", testData.series)

assert.Equal(t, testData.expected, actual)
Expand Down Expand Up @@ -562,7 +562,7 @@ func TestLimiter_AssertMaxSeriesPerLabelSet(t *testing.T) {
// Mock limits
limits := validation.NewOverrides(testData.limits, nil)

limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "")
limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "", false)
actual := limiter.AssertMaxSeriesPerLabelSet("test", labels.FromStrings("foo", "bar"), func(limits []validation.LimitsPerLabelSet, limit validation.LimitsPerLabelSet) (int, error) {
return testData.series, nil
})
Expand Down Expand Up @@ -625,7 +625,7 @@ func TestLimiter_AssertMaxMetricsWithMetadataPerUser(t *testing.T) {
MaxGlobalMetricsWithMetadataPerUser: testData.maxGlobalMetadataPerUser,
}, nil)

limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "")
limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, testData.shardByAllLabels, testData.ringReplicationFactor, false, "", false)
actual := limiter.AssertMaxMetricsWithMetadataPerUser("test", testData.metadata)

assert.Equal(t, testData.expected, actual)
Expand All @@ -648,7 +648,7 @@ func TestLimiter_FormatError(t *testing.T) {
MaxGlobalMetadataPerMetric: 3,
}, nil)

limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, true, 3, false, "please contact administrator to raise it")
limiter := NewLimiter(limits, ring, util.ShardingStrategyDefault, true, 3, false, "please contact administrator to raise it", false)
lbls := labels.FromStrings(labels.MetricName, "testMetric")

actual := limiter.FormatError("user-1", errMaxSeriesPerUserLimitExceeded, lbls)
Expand Down Expand Up @@ -727,3 +727,121 @@ func (m *ringCountMock) ZonesCount() int {
args := m.Called()
return args.Int(0)
}

func TestLimiter_convertGlobalToLocalLimit_CacheDuringScaleUp(t *testing.T) {
tests := map[string]struct {
initialIngesterCount int
scaledIngesterCount int
globalLimit int
replicationFactor int
expectedFirstLimit int
expectedSecondLimit int
}{
"local limit should not shrink when global limit unchanged during scale-up": {
initialIngesterCount: 75,
scaledIngesterCount: 249,
globalLimit: 2700000,
replicationFactor: 3,
expectedFirstLimit: 108000, // 2700000 / 75 * 3
expectedSecondLimit: 108000, // cached, not 32530
},
"local limit should increase when ingesters decrease (scale-down)": {
initialIngesterCount: 249,
scaledIngesterCount: 75,
globalLimit: 2700000,
replicationFactor: 3,
expectedFirstLimit: 32530, // 2700000 / 249 * 3
expectedSecondLimit: 108000, // 2700000 / 75 * 3 (higher, so updated)
},
"local limit should recalculate when global limit changes": {
initialIngesterCount: 75,
scaledIngesterCount: 249,
globalLimit: 5000000, // changed from initial
replicationFactor: 3,
expectedFirstLimit: 108000, // 2700000 / 75 * 3 (with initial global)
expectedSecondLimit: 60240, // 5000000 / 249 * 3 (recalculated)
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
// Setup with initial ingester count
ring := &ringCountMock{}
ring.On("HealthyInstancesCount").Return(testData.initialIngesterCount).Once()
ring.On("ZonesCount").Return(1)

limiter := NewLimiter(
nil, ring, "", true, testData.replicationFactor, false, "", true,
)

// First call with initial ingester count
initialGlobal := 2700000
firstLimit := limiter.convertGlobalToLocalLimit("test-user", initialGlobal)
assert.Equal(t, testData.expectedFirstLimit, firstLimit)

// Scale up: change ingester count
ring2 := &ringCountMock{}
ring2.On("HealthyInstancesCount").Return(testData.scaledIngesterCount)
ring2.On("ZonesCount").Return(1)
limiter.ring = ring2

// Second call after scale-up
secondLimit := limiter.convertGlobalToLocalLimit("test-user", testData.globalLimit)
assert.Equal(t, testData.expectedSecondLimit, secondLimit)
})
}
}

func TestLimiter_convertGlobalToLocalLimit_CacheResetOnHeadCompaction(t *testing.T) {
ring := &ringCountMock{}
ring.On("HealthyInstancesCount").Return(75)
ring.On("ZonesCount").Return(1)

limiter := NewLimiter(nil, ring, "", true, 3, false, "", true)

// First call: establishes cache
limit1 := limiter.convertGlobalToLocalLimit("test-user", 2700000)
assert.Equal(t, 108000, limit1) // 2700000 / 75 * 3

// Scale up
ring2 := &ringCountMock{}
ring2.On("HealthyInstancesCount").Return(249)
ring2.On("ZonesCount").Return(1)
limiter.ring = ring2

// Second call: cache prevents shrinking
limit2 := limiter.convertGlobalToLocalLimit("test-user", 2700000)
assert.Equal(t, 108000, limit2) // cached

// Another user also cached
limit2b := limiter.convertGlobalToLocalLimit("other-user", 2700000)
assert.Equal(t, 32530, limit2b) // fresh for other-user (no prior cache)

// Simulate head compaction for test-user only
limiter.ResetLocalLimitCache("test-user")

// test-user: cache cleared, uses new calculation
limit3 := limiter.convertGlobalToLocalLimit("test-user", 2700000)
assert.Equal(t, 32530, limit3) // 2700000 / 249 * 3

// other-user: unaffected by test-user's reset, cache still holds
// (other-user had no prior higher cache, so it stays at 32530)
limit3b := limiter.convertGlobalToLocalLimit("other-user", 2700000)
assert.Equal(t, 32530, limit3b)
}

func TestLimiter_convertGlobalToLocalLimit_GlobalLimitDecrease(t *testing.T) {
ring := &ringCountMock{}
ring.On("HealthyInstancesCount").Return(100)
ring.On("ZonesCount").Return(1)

limiter := NewLimiter(nil, ring, "", true, 3, false, "", true)

// First call with high global limit
limit1 := limiter.convertGlobalToLocalLimit("test-user", 5000000)
assert.Equal(t, 150000, limit1) // 5000000 / 100 * 3

// Global limit decreases (customer downgrade)
limit2 := limiter.convertGlobalToLocalLimit("test-user", 2000000)
assert.Equal(t, 60000, limit2) // 2000000 / 100 * 3 (recalculated, not cached)
}
Loading
Loading