Skip to content

Commit ad49b2e

Browse files
authored
Dynamic splitting by interval for range queries (#6458)
* add limit for range query max splits by interval Signed-off-by: Ahmed Hassan <[email protected]> * Change dynamic interval sharding to take into account vertical sharding Signed-off-by: Ahmed Hassan <[email protected]> * add dynamic sharding based on total days of data fetched for query Signed-off-by: Ahmed Hassan <[email protected]> * add unit tests for dynamicIntervalFn Signed-off-by: Ahmed Hassan <[email protected]> * allow using any base interval size for dynamicIntervalFn Signed-off-by: Ahmed Hassan <[email protected]> * add dynamic query splits to experimental features Signed-off-by: Ahmed Hassan <[email protected]> * rename dynamicIntervalFn unit tests Signed-off-by: Ahmed Hassan <[email protected]> * refactor dynamicIntervalFn to be more readable Signed-off-by: Ahmed Hassan <[email protected]> * add unit tests for getIntervalFromMaxSplits Signed-off-by: Ahmed Hassan <[email protected]> * add unit tests for analyzeDurationFetchedByQuery Signed-off-by: Ahmed Hassan <[email protected]> * fix formatting Signed-off-by: Ahmed Hassan <[email protected]> * update docs Signed-off-by: Ahmed Hassan <[email protected]> * update experimental features Signed-off-by: Ahmed Hassan <[email protected]> * fix dynamic splitting when query range is shorter than base interval Signed-off-by: Ahmed Hassan <[email protected]> * refactor dynamic query splitting into smaller helper functions Signed-off-by: Ahmed Hassan <[email protected]> * use duration instead of int for dynamic query splitting calculation Signed-off-by: Ahmed Hassan <[email protected]> * add comments for getIntervalFromMaxSplits Signed-off-by: Ahmed Hassan <[email protected]> * add ceilDiv helper function Signed-off-by: Ahmed Hassan <[email protected]> * add default max splits by duration fetched Signed-off-by: Ahmed Hassan <[email protected]> * add new unit tests for helper functions Signed-off-by: Ahmed Hassan <[email protected]> * add changelog Signed-off-by: Ahmed Hassan <[email protected]> --------- Signed-off-by: Ahmed Hassan <[email protected]> Signed-off-by: Ahmed Hassan <[email protected]>
1 parent 2a6dd6b commit ad49b2e

File tree

10 files changed

+1130
-23
lines changed

10 files changed

+1130
-23
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# Changelog
22

33
## master / unreleased
4+
* [FEATURE] Query Frontend: Add dynamic interval size for query splitting. This is enabled by configuring experimental flags `querier.max-shards-per-query` and/or `querier.max-fetched-data-duration-per-query`. The split interval size is dynamically increased to maintain a number of shards and total duration fetched below the configured values. #6458
45
* [ENHANCEMENT] Add `compactor.auto-forget-delay` for compactor to auto forget compactors after X minutes without heartbeat. #6533
5-
66
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
77
* [ENHANCEMENT] StoreGateway: Emit more histogram buckets on the `cortex_querier_storegateway_refetches_per_query` metric. #6570
88
* [ENHANCEMENT] Querier: Apply bytes limiter to LabelNames and LabelValuesForLabelNames. #6568

docs/configuration/config-file-reference.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4276,6 +4276,22 @@ The `query_range_config` configures the query splitting and caching in the Corte
42764276
# CLI flag: -querier.split-queries-by-interval
42774277
[split_queries_by_interval: <duration> | default = 0s]
42784278
4279+
dynamic_query_splits:
4280+
# [EXPERIMENTAL] Maximum number of shards for a query, 0 disables it.
4281+
# Dynamically uses a multiple of split interval to maintain a total number of
4282+
# shards below the set value. If vertical sharding is enabled for a query, the
4283+
# combined total number of interval splits and vertical shards is kept below
4284+
# this value.
4285+
# CLI flag: -querier.max-shards-per-query
4286+
[max_shards_per_query: <int> | default = 0]
4287+
4288+
# [EXPERIMENTAL] Max total duration of data fetched from storage by all query
4289+
# shards, 0 disables it. Dynamically uses a multiple of split interval to
4290+
# maintain a total fetched duration of data lower than the value set. It takes
4291+
# into account additional duration fetched by matrix selectors and subqueries.
4292+
# CLI flag: -querier.max-fetched-data-duration-per-query
4293+
[max_fetched_data_duration_per_query: <duration> | default = 0s]
4294+
42794295
# Mutate incoming queries to align their start and end with their step.
42804296
# CLI flag: -querier.align-querier-with-step
42814297
[align_queries_with_step: <boolean> | default = false]

docs/configuration/v1-guarantees.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,6 @@ Currently experimental features are:
120120
- Enable string interning for metrics labels by setting `-ingester.labels-string-interning-enabled` on Ingester.
121121
- Query-frontend: query rejection (`-frontend.query-rejection.enabled`)
122122
- Querier: protobuf codec (`-api.querier-default-codec`)
123+
- Query-frontend: dynamic query splits
124+
- `querier.max-shards-per-query` (int) CLI flag
125+
- `querier.max-fetched-data-duration-per-query` (duration) CLI flag

pkg/frontend/transport/handler.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,7 @@ func (f *Handler) reportQueryStats(r *http.Request, source, userID string, query
372372
splitQueries := stats.LoadSplitQueries()
373373
dataSelectMaxTime := stats.LoadDataSelectMaxTime()
374374
dataSelectMinTime := stats.LoadDataSelectMinTime()
375+
splitInterval := stats.LoadSplitInterval()
375376

376377
// Track stats.
377378
f.querySeconds.WithLabelValues(source, userID).Add(wallTime.Seconds())
@@ -446,6 +447,10 @@ func (f *Handler) reportQueryStats(r *http.Request, source, userID string, query
446447
logMessage = append(logMessage, "query_storage_wall_time_seconds", sws)
447448
}
448449

450+
if splitInterval > 0 {
451+
logMessage = append(logMessage, "split_interval", splitInterval.String())
452+
}
453+
449454
if error != nil {
450455
s, ok := status.FromError(error)
451456
if !ok {

pkg/querier/stats/stats.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type QueryStats struct {
2121
Priority int64
2222
DataSelectMaxTime int64
2323
DataSelectMinTime int64
24+
SplitInterval time.Duration
2425
m sync.Mutex
2526
}
2627

@@ -287,6 +288,14 @@ func (s *QueryStats) LoadDataSelectMinTime() int64 {
287288
return atomic.LoadInt64(&s.DataSelectMinTime)
288289
}
289290

291+
func (s *QueryStats) LoadSplitInterval() time.Duration {
292+
if s == nil {
293+
return 0
294+
}
295+
296+
return s.SplitInterval
297+
}
298+
290299
func (s *QueryStats) AddStoreGatewayTouchedPostings(count uint64) {
291300
if s == nil {
292301
return

pkg/querier/tripperware/queryrange/limits_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -233,9 +233,10 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
233233
}
234234

235235
type mockLimits struct {
236-
maxQueryLookback time.Duration
237-
maxQueryLength time.Duration
238-
maxCacheFreshness time.Duration
236+
maxQueryLookback time.Duration
237+
maxQueryLength time.Duration
238+
maxCacheFreshness time.Duration
239+
queryVerticalShardSize int
239240
}
240241

241242
func (m mockLimits) MaxQueryLookback(string) time.Duration {
@@ -255,7 +256,7 @@ func (m mockLimits) MaxCacheFreshness(string) time.Duration {
255256
}
256257

257258
func (m mockLimits) QueryVerticalShardSize(userID string) int {
258-
return 0
259+
return m.queryVerticalShardSize
259260
}
260261

261262
func (m mockLimits) QueryPriority(userID string) validation.QueryPriority {

pkg/querier/tripperware/queryrange/query_range_middlewares.go

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,14 @@ const day = 24 * time.Hour
3434

3535
// Config for query_range middleware chain.
3636
type Config struct {
37-
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
38-
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
39-
ResultsCacheConfig `yaml:"results_cache"`
40-
CacheResults bool `yaml:"cache_results"`
41-
MaxRetries int `yaml:"max_retries"`
37+
// Query splits config
38+
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
39+
DynamicQuerySplitsConfig DynamicQuerySplitsConfig `yaml:"dynamic_query_splits"`
40+
41+
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
42+
ResultsCacheConfig `yaml:"results_cache"`
43+
CacheResults bool `yaml:"cache_results"`
44+
MaxRetries int `yaml:"max_retries"`
4245
// List of headers which query_range middleware chain would forward to downstream querier.
4346
ForwardHeaders flagext.StringSlice `yaml:"forward_headers_list"`
4447

@@ -54,6 +57,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
5457
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
5558
f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.")
5659
cfg.ResultsCacheConfig.RegisterFlags(f)
60+
cfg.DynamicQuerySplitsConfig.RegisterFlags(f)
5761
}
5862

5963
// Validate validates the config.
@@ -66,9 +70,25 @@ func (cfg *Config) Validate(qCfg querier.Config) error {
6670
return errors.Wrap(err, "invalid ResultsCache config")
6771
}
6872
}
73+
if cfg.DynamicQuerySplitsConfig.MaxShardsPerQuery > 0 || cfg.DynamicQuerySplitsConfig.MaxFetchedDataDurationPerQuery > 0 {
74+
if cfg.SplitQueriesByInterval <= 0 {
75+
return errors.New("configs under dynamic-query-splits requires that a value for split-queries-by-interval is set.")
76+
}
77+
}
6978
return nil
7079
}
7180

81+
type DynamicQuerySplitsConfig struct {
82+
MaxShardsPerQuery int `yaml:"max_shards_per_query"`
83+
MaxFetchedDataDurationPerQuery time.Duration `yaml:"max_fetched_data_duration_per_query"`
84+
}
85+
86+
// RegisterFlags registers flags foy dynamic query splits
87+
func (cfg *DynamicQuerySplitsConfig) RegisterFlags(f *flag.FlagSet) {
88+
f.IntVar(&cfg.MaxShardsPerQuery, "querier.max-shards-per-query", 0, "[EXPERIMENTAL] Maximum number of shards for a query, 0 disables it. Dynamically uses a multiple of split interval to maintain a total number of shards below the set value. If vertical sharding is enabled for a query, the combined total number of interval splits and vertical shards is kept below this value.")
89+
f.DurationVar(&cfg.MaxFetchedDataDurationPerQuery, "querier.max-fetched-data-duration-per-query", 0, "[EXPERIMENTAL] Max total duration of data fetched from storage by all query shards, 0 disables it. Dynamically uses a multiple of split interval to maintain a total fetched duration of data lower than the value set. It takes into account additional duration fetched by matrix selectors and subqueries.")
90+
}
91+
7292
// Middlewares returns list of middlewares that should be applied for range query.
7393
func Middlewares(
7494
cfg Config,
@@ -89,8 +109,11 @@ func Middlewares(
89109
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("step_align", metrics), StepAlignMiddleware)
90110
}
91111
if cfg.SplitQueriesByInterval != 0 {
92-
staticIntervalFn := func(_ tripperware.Request) time.Duration { return cfg.SplitQueriesByInterval }
93-
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, prometheusCodec, registerer))
112+
intervalFn := staticIntervalFn(cfg)
113+
if cfg.DynamicQuerySplitsConfig.MaxShardsPerQuery > 0 || cfg.DynamicQuerySplitsConfig.MaxFetchedDataDurationPerQuery > 0 {
114+
intervalFn = dynamicIntervalFn(cfg, limits, queryAnalyzer, lookbackDelta)
115+
}
116+
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(intervalFn, limits, prometheusCodec, registerer, lookbackDelta))
94117
}
95118

96119
var c cache.Cache

0 commit comments

Comments
 (0)