Skip to content

Dynamic splitting by interval for range queries #6458

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b874e4e
add limit for range query max splits by interval
afhassan Dec 24, 2024
6106978
Change dynamic interval sharding to take into account vertical sharding
afhassan Dec 31, 2024
01c121c
add dynamic sharding based on total days of data fetched for query
afhassan Jan 16, 2025
b15dde6
add unit tests for dynamicIntervalFn
afhassan Jan 17, 2025
a1e7a3a
allow using any base interval size for dynamicIntervalFn
afhassan Jan 22, 2025
08ec706
add dynamic query splits to experimental features
afhassan Jan 22, 2025
7349cfa
rename dynamicIntervalFn unit tests
afhassan Jan 23, 2025
3e95b45
refactor dynamicIntervalFn to be more readable
afhassan Jan 24, 2025
cfc5078
add unit tests for getIntervalFromMaxSplits
afhassan Jan 27, 2025
83efc5c
add unit tests for analyzeDurationFetchedByQuery
afhassan Jan 28, 2025
427f5b2
fix formatting
afhassan Jan 28, 2025
c931aa6
update docs
afhassan Jan 28, 2025
c1ae047
update experimental features
afhassan Jan 28, 2025
ba90bf0
fix dynamic splitting when query range is shorter than base interval
afhassan Jan 31, 2025
8385814
refactor dynamic query splitting into smaller helper functions
afhassan Feb 6, 2025
9225d23
use duration instead of int for dynamic query splitting calculation
afhassan Feb 8, 2025
bea70ad
add comments for getIntervalFromMaxSplits
afhassan Feb 9, 2025
df492d8
add ceilDiv helper function
afhassan Feb 9, 2025
ce661ed
add default max splits by duration fetched
afhassan Feb 11, 2025
701d393
add new unit tests for helper functions
afhassan Feb 11, 2025
2f17a99
add changelog
afhassan Feb 14, 2025
bb1c15b
Merge branch 'master' into master
afhassan Feb 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4193,6 +4193,22 @@ The `query_range_config` configures the query splitting and caching in the Corte
# CLI flag: -querier.split-queries-by-interval
[split_queries_by_interval: <duration> | default = 0s]

dynamic_query_splits:
# [EXPERIMENTAL] Maximum number of splits for a query, 0 disables it.
# Dynamically uses a multiple of split interval to maintain a total number of
# splits below the set value. If vertical sharding is enabled for a query, the
# combined total number of vertical and interval splits is kept below this
# value.
# CLI flag: -querier.max-splits-per-query
[max_splits_per_query: <int> | default = 0]

# [EXPERIMENTAL] Max total duration of data fetched from storage by all query
# splits, 0 disables it. Dynamically uses a multiple of split interval to
# maintain a total fetched duration of data lower than the value set. It takes
# into account additional duration fetched by matrix selectors and subqueries.
# CLI flag: -querier.max-fetched-storage-data-duration-per-query
[max_fetched_storage_data_duration_per_query: <duration> | default = 0s]

# Mutate incoming queries to align their start and end with their step.
# CLI flag: -querier.align-querier-with-step
[align_queries_with_step: <boolean> | default = false]
Expand Down
3 changes: 3 additions & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,6 @@ Currently experimental features are:
- Enable string interning for metrics labels by setting `-ingester.labels-string-interning-enabled` on Ingester.
- Query-frontend: query rejection (`-frontend.query-rejection.enabled`)
- Querier: protobuf codec (`-api.querier-default-codec`)
- Query-frontend: dynamic query splits
- `querier.max-shards-per-query` (int) CLI flag
- `querier.max-duration-of-data-fetched-from-storage-per-query` (duration) CLI flag
5 changes: 5 additions & 0 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
splitQueries := stats.LoadSplitQueries()
dataSelectMaxTime := stats.LoadDataSelectMaxTime()
dataSelectMinTime := stats.LoadDataSelectMinTime()
splitInterval := stats.LoadSplitInterval()

// Track stats.
f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds())
Expand Down Expand Up @@ -425,6 +426,10 @@ func (f *Handler) reportQueryStats(r *http.Request, userID string, queryString u
logMessage = append(logMessage, "query_storage_wall_time_seconds", sws)
}

if splitInterval > 0 {
logMessage = append(logMessage, "split_interval", splitInterval.String())
}

if error != nil {
s, ok := status.FromError(error)
if !ok {
Expand Down
9 changes: 9 additions & 0 deletions pkg/querier/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type QueryStats struct {
Priority int64
DataSelectMaxTime int64
DataSelectMinTime int64
SplitInterval time.Duration
m sync.Mutex
}

Expand Down Expand Up @@ -287,6 +288,14 @@ func (s *QueryStats) LoadDataSelectMinTime() int64 {
return atomic.LoadInt64(&s.DataSelectMinTime)
}

func (s *QueryStats) LoadSplitInterval() time.Duration {
if s == nil {
return 0
}

return s.SplitInterval
}

func (s *QueryStats) AddStoreGatewayTouchedPostings(count uint64) {
if s == nil {
return
Expand Down
9 changes: 5 additions & 4 deletions pkg/querier/tripperware/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,10 @@ func TestLimitsMiddleware_MaxQueryLength(t *testing.T) {
}

type mockLimits struct {
maxQueryLookback time.Duration
maxQueryLength time.Duration
maxCacheFreshness time.Duration
maxQueryLookback time.Duration
maxQueryLength time.Duration
maxCacheFreshness time.Duration
queryVerticalShardSize int
}

func (m mockLimits) MaxQueryLookback(string) time.Duration {
Expand All @@ -255,7 +256,7 @@ func (m mockLimits) MaxCacheFreshness(string) time.Duration {
}

func (m mockLimits) QueryVerticalShardSize(userID string) int {
return 0
return m.queryVerticalShardSize
}

func (m mockLimits) QueryPriority(userID string) validation.QueryPriority {
Expand Down
37 changes: 30 additions & 7 deletions pkg/querier/tripperware/queryrange/query_range_middlewares.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@ const day = 24 * time.Hour

// Config for query_range middleware chain.
type Config struct {
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
// Query splits config
SplitQueriesByInterval time.Duration `yaml:"split_queries_by_interval"`
DynamicQuerySplitsConfig DynamicQuerySplitsConfig `yaml:"dynamic_query_splits"`

AlignQueriesWithStep bool `yaml:"align_queries_with_step"`
ResultsCacheConfig `yaml:"results_cache"`
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
// List of headers which query_range middleware chain would forward to downstream querier.
ForwardHeaders flagext.StringSlice `yaml:"forward_headers_list"`

Expand All @@ -54,6 +57,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.Var(&cfg.ForwardHeaders, "frontend.forward-headers-list", "List of headers forwarded by the query Frontend to downstream querier.")
cfg.ResultsCacheConfig.RegisterFlags(f)
cfg.DynamicQuerySplitsConfig.RegisterFlags(f)
}

// Validate validates the config.
Expand All @@ -66,9 +70,25 @@ func (cfg *Config) Validate(qCfg querier.Config) error {
return errors.Wrap(err, "invalid ResultsCache config")
}
}
if cfg.DynamicQuerySplitsConfig.MaxSplitsPerQuery > 0 || cfg.DynamicQuerySplitsConfig.MaxFetchedStorageDataDurationPerQuery > 0 {
if cfg.SplitQueriesByInterval <= 0 {
return errors.New("configs under dynamic-query-splits requires that a value for split-queries-by-interval is set.")
}
}
return nil
}

type DynamicQuerySplitsConfig struct {
MaxSplitsPerQuery int `yaml:"max_splits_per_query"`
MaxFetchedStorageDataDurationPerQuery time.Duration `yaml:"max_fetched_storage_data_duration_per_query"`
}

// RegisterFlags registers flags foy dynamic query splits
func (cfg *DynamicQuerySplitsConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxSplitsPerQuery, "querier.max-splits-per-query", 0, "[EXPERIMENTAL] Maximum number of splits for a query, 0 disables it. Dynamically uses a multiple of split interval to maintain a total number of splits below the set value. If vertical sharding is enabled for a query, the combined total number of vertical and interval splits is kept below this value.")
f.DurationVar(&cfg.MaxFetchedStorageDataDurationPerQuery, "querier.max-fetched-storage-data-duration-per-query", 0, "[EXPERIMENTAL] Max total duration of data fetched from storage by all query splits, 0 disables it. Dynamically uses a multiple of split interval to maintain a total fetched duration of data lower than the value set. It takes into account additional duration fetched by matrix selectors and subqueries.")
}

// Middlewares returns list of middlewares that should be applied for range query.
func Middlewares(
cfg Config,
Expand All @@ -89,8 +109,11 @@ func Middlewares(
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("step_align", metrics), StepAlignMiddleware)
}
if cfg.SplitQueriesByInterval != 0 {
staticIntervalFn := func(_ tripperware.Request) time.Duration { return cfg.SplitQueriesByInterval }
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, prometheusCodec, registerer))
intervalFn := staticIntervalFn(cfg)
if cfg.DynamicQuerySplitsConfig.MaxSplitsPerQuery > 0 || cfg.DynamicQuerySplitsConfig.MaxFetchedStorageDataDurationPerQuery > 0 {
intervalFn = dynamicIntervalFn(cfg, limits, queryAnalyzer, lookbackDelta)
}
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(intervalFn, limits, prometheusCodec, registerer, lookbackDelta))
}

var c cache.Cache
Expand Down
Loading
Loading