-
Notifications
You must be signed in to change notification settings - Fork 817
Dynamic splitting by interval for range queries #6458
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Ahmed Hassan <[email protected]>
staticIntervalFn := func(_ tripperware.Request) time.Duration { return cfg.SplitQueriesByInterval } | ||
queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("split_by_interval", metrics), SplitByIntervalMiddleware(staticIntervalFn, limits, prometheusCodec, registerer)) | ||
intervalFn := func(_ tripperware.Request) time.Duration { return cfg.SplitQueriesByInterval } | ||
if cfg.SplitQueriesByIntervalMaxSplits != 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the limit be applied to both range splits and vertical spits?
func (s shardBy) Do(ctx context.Context, r Request) (Response, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically this sets a limit for the total range and vertical splits for a given query. The number of vertical shards is static, so the max number of of splits for a given query becomes split_queries_by_interval_max_splits
x query_vertical_shard_size
. Because of this adding a separate limit for vertical sharding when the number of vertical shards is a static config would be redundant because we limit it already.
Signed-off-by: Ahmed Hassan <[email protected]>
Instead of changing split interval using max number of split queries, can we try to combine it with estimated data to fetch? For example, a query up[30d] is very expensive to split to 30 splits as each split query still fetches 30 day of data so 30 splits ended up fetching 900 days of data. Instead of having a limit of total splits should we use total days of data to fetch? |
That's a good idea - I can add a new limit for total hours of data fetched and adjust the interval to not exceed it. We can still keep max number of splits since it gives more flexibility to limit the number of shards for queries with long day range even if they don't fetch a lot of days of data like the example you mentioned |
Signed-off-by: Ahmed Hassan <[email protected]>
if err != nil { | ||
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) | ||
} | ||
s.splitByCounter.Add(float64(len(reqs))) | ||
|
||
stats := querier_stats.FromContext(ctx) |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is only used to log the interval size used for splitting the query.
Signed-off-by: Ahmed Hassan <[email protected]>
I get the idea. But my main concern for such dynamic split interval + max splits by interval is that results cache will have very bad hit ratio as our current results cache key is tied to your split interval.
The first 30 day range query uses 24h interval so 24h will be part of our results cache key. Making vertical shard size dynamic seems more friendly to results cache because vertical shard size is not part of the results cache key. However, not all queries can be vertically sharded. |
Isn't this true today with Grafana modifying the step interval? For example the 30d query will have a step of 900s vs a 40d query will have a step of 1200s. Since the step is also in the cache key, this will already invalidate the cache. I agree with you on changing the vertical shard size first. Could we mark this feature experimental and iterate on it? |
Yeah let's mark it experimental in https://cortexmetrics.io/docs/configuration/v1guarantees/#experimental-features |
Signed-off-by: Ahmed Hassan <[email protected]>
Signed-off-by: Ahmed Hassan <[email protected]>
Signed-off-by: Ahmed Hassan <[email protected]>
|
||
// calculates the total duration of data the query will have to fetch from storage as a multiple of baseInterval. | ||
// also returns the total time range fetched by the original query start and end times | ||
func durationFetchedByQuery(expr parser.Expr, req tripperware.Request, queryStoreAfter, lookbackDelta time.Duration, baseInterval time.Duration, now time.Time) (durationFetchedCount int, originalRangeCount int, lookbackDeltaCount int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Follow the convention for adding comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rewrote the comment to be more clear. The convention seems to be general guidelines not a specific format but I tried to follow it as much as possible.
@@ -408,3 +413,189 @@ func Test_evaluateAtModifier(t *testing.T) { | |||
}) | |||
} | |||
} | |||
|
|||
func TestDynamicIntervalFn(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also add separate tests for durationFetchedByQuery()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, could you add more tests for split_by_interval with dynamic splits enabled? Maybe in TestSplitByDay()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added tests for both and also added tests for getIntervalFromMaxSplits()
durationFetchedCount = 0 | ||
originalRangeCount = 0 | ||
lookbackDeltaCount = 0 | ||
baseIntervalMillis := util.DurationMilliseconds(baseInterval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why the func returns as multiples of baseInterval rather than just a duration?
Since the function is called durationFetchedByQuery, I'd expect it to return a duration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The calculations done with these are all based on integers and rounding down is important for the result. I refactored the function to be more readable and changed its name. Let me know if you think we should make more changes to it.
Signed-off-by: Ahmed Hassan <[email protected]>
Signed-off-by: Ahmed Hassan <[email protected]>
Signed-off-by: Ahmed Hassan <[email protected]>
Thanks @harry671003 for the all the feedback! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing the comments. LGTM
Signed-off-by: Ahmed Hassan <[email protected]>
Signed-off-by: Ahmed Hassan <[email protected]>
Signed-off-by: Ahmed Hassan <[email protected]>
n++ | ||
} | ||
} | ||
return n * baseInterval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is a bit confusing to me. I added a new test case in Test_getIntervalFromMaxSplits
below. Since the total time range is only 23h so I expect it to be split by 1 day. But the result was split by 2 days. Please take a look
{
name: "23h with 10 max splits, expected to split by 1 day",
baseSplitInterval: day,
req: &tripperware.PrometheusRequest{
Start: 12 * 3600 * seconds,
End: 35 * 3600 * seconds,
Step: 5 * 60 * seconds,
Query: "foo",
},
maxSplits: 10,
expectedInterval: 1 * day,
},
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, this part should only run when maxSplits == 1
.
The condition I had was not right. I changed it and added the test case for when query range < interval
if maxSplits == 1 {
// No splitting, interval should be long enough to result in 1 split only
nextSplitStart := nextIntervalBoundary(r.GetStart(), r.GetStep(), n*baseInterval) + r.GetStep()
if nextSplitStart < r.GetEnd() {
queryRangeWithoutFirstSplit := time.Duration((r.GetEnd() - nextSplitStart) * int64(time.Millisecond))
n += (queryRangeWithoutFirstSplit + baseInterval - 1) / baseInterval
}
}
extraIntervalsPerSplit = 1 // avoid division by 0 | ||
} | ||
|
||
// Next analyze the query using the next split start time to find the additional duration fetched by lookbackDelta for other subsequent splits |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we can change some of those code below into dedicated functions. It is very hard to review now due to the complexity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored it into smaller helper functions and changed some variable names to try and make it more readable
Signed-off-by: Ahmed Hassan <[email protected]>
Signed-off-by: Ahmed Hassan <[email protected]>
Signed-off-by: Ahmed Hassan <[email protected]>
func getIntervalFromMaxSplits(r tripperware.Request, baseInterval time.Duration, maxSplitsInt int) time.Duration { | ||
maxSplits := time.Duration(maxSplitsInt) | ||
queryRange := time.Duration((r.GetEnd() - r.GetStart()) * int64(time.Millisecond)) | ||
|
||
// Calculate the multiple n of interval needed to shard query to <= maxSplits | ||
// Calculate the multiple n of interval needed to shard query into <= maxSplits | ||
n := (queryRange + baseInterval*maxSplits - 1) / (baseInterval * maxSplits) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is not straight forward to understand this. Can you add some comments? Same for L227
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I updated the comments. Below is how I would explain this function.
queryRange
is divided by maxSplits
to get the interval. The reason we divide by baseInterval
as well is to get the multiple (n)
that can be rounded up. This ensures the returned interval is a multiple of the base interval.
Adding baseInterval*maxSplits - 1
is to round up the calculated (n)
.
The loop is to handle cases where the first split is shorter than other splits. This happens because queries are split only at multiples of interval. So if the split interval is 8 days, the start time of the next interval can only be at timestamps of days 0, 8, 16, 24... So for a start time at day 6 the first shard will be 2 days long only. This is not accounted for in the calculation above which assumes the full query range is split into intervals of size 8.
The loop exits if, after removing the first split and recalculating (n)
, it is not larger than the one calculated before. If it is larger then the loop will keep incrementing n
and recalculating until the correct one is found. The loop terminates when (n)
interval is twice the query range because that interval is guaranteed to shard the query into 1 shard only.
This is an example 37 day range query from day 6 to 43 that should be split into a maximum of 5 splits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For rounding up, the formula is
ceil(a/b)= (a + (b−1)) / b
Integer division in Go always rounds down. The idea of adding (b - 1)
is that it guarantees that any small remainder (even 1 nanosecond) would round the result up one whole number. If there is no remainder at all then (b - 1)
would get rounded down anyway.
This is where I originally found this method rust-lang/rfcs#2844
But I do agree that it is confusing with two variables in the denominator, so I replaced it with a ceilDiv(a, b)
helper function.
Signed-off-by: Ahmed Hassan <[email protected]>
Signed-off-by: Ahmed Hassan <[email protected]>
Signed-off-by: Ahmed Hassan <[email protected]>
Signed-off-by: Ahmed Hassan <[email protected]>
// First analyze the query using original start-end time. Duration fetched by lookbackDelta here only reflects the start time of first split | ||
durationFetchedByRange, durationFetchedBySelectors, durationFetchedByLookbackDeltaFirstSplit := analyzeDurationFetchedByQueryExpr(expr, queryStart, queryEnd, baseInterval, lookbackDelta) | ||
|
||
fixedDurationFetched += durationFetchedByRange // Duration fetched by the query range is constant regardless of how many splits the query has |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All split here means horizontal splits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I try to refer to splits as horizontal splits only, while shards is the final total of splits x vertical shards
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following up on this, if you were referring to perSplitDurationFetched
, then it is actually for every shard too. We can calculate the total duration fetched after splitting while ignoring vertical sharding completely, and at the end the total duration fetched we have can be multiplied by vertical shards
to get the final duration fetched.
Using the same logic, we start by dividing maxFetchedDataDurationPerQuery / time.Duration(queryVerticalShardSize)
, and the rest of the calculation can ignore vertical shard size.
Sorry for the confusion earlier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. But we are missing changelog here.
Signed-off-by: Ahmed Hassan <[email protected]>
Signed-off-by: Ahmed Hassan <[email protected]>
What this PR does:
Cortex supports only using a static interval to split range queries. This PR adds two new configs that dynamically adjust the split interval to a multiple of the configured
split_queries_by_interval
depending on the given query.New configs:
1 -
max_shards_per_query
Accepts an
int
value for the total number of shards for a query. The split interval is increased into a multiple ofsplit_queries_by_interval
to ensure that the total number of shards remains below the configured value. This takes into account vertical sharding if it is configured.Examples:
split_queries_by_interval
= 24hmax_shards_per_query
= 302 -
max_fetched_data_duration_per_query
Accepts a
duration
for the total duration of data fetched by all shards of a query. Certain queries can fetch a long duration of data per each shard when executing. This configuration uses a multiple ofsplit_queries_by_interval
to reduce the number of shards so that the total duration of data fetched remains below the configured value.Examples:
split_queries_by_interval
= 24hmax_fetched_data_duration_per_query
= 2400h // 100 daysup
with 30 day range is split into 30 shards using 24h intervalEach shard fetches 1 day of data for a total of 30 days
up[10d]
with 30 day range is split into 6 shards using 120h interval.Each shard fetches [5 + 10] days of data for a total of 90 days.
If the query was split into 30 shards using 24h default interval.
Each shard would fetch [1 + 10] days of data for a total of 330 days.
Checklist
CHANGELOG.md
updated - the order of entries should be[CHANGE]
,[FEATURE]
,[ENHANCEMENT]
,[BUGFIX]