Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 90 additions & 53 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,51 @@ type RulesLimits interface {
RulerExternalLabels(userID string) labels.Labels
}

// EngineQueryFunc returns a new engine query function validating max queryLength.
// Modified from Prometheus rules.EngineQueryFunc
// https://github.com/prometheus/prometheus/blob/v2.39.1/rules/manager.go#L189.
type QueryExecutor func(ctx context.Context, qs string, t time.Time) (promql.Vector, error)

func EngineQueryFunc(engine promql.QueryEngine, frontendClient *frontendClient, q storage.Queryable, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc {
var executor QueryExecutor

if frontendClient != nil {
// query to query frontend
executor = frontendClient.InstantQuery
} else {
// query to engine
executor = func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
return executeQuery(ctx, engine, q, qs, t)
}
}

return wrapWithMiddleware(executor, overrides, userID, lookbackDelta)
}

func executeQuery(ctx context.Context, engine promql.QueryEngine, q storage.Queryable, qs string, t time.Time) (promql.Vector, error) {
qry, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
if err != nil {
return nil, err
}
defer qry.Close()

res := qry.Exec(ctx)
if res.Err != nil {
return nil, res.Err
}

switch v := res.Value.(type) {
case promql.Vector:
return v, nil
case promql.Scalar:
return promql.Vector{promql.Sample{
T: v.T,
F: v.V,
Metric: labels.Labels{},
}}, nil
default:
return nil, errors.New("rule result is not a vector or scalar")
}
}

func wrapWithMiddleware(next QueryExecutor, overrides RulesLimits, userID string, lookbackDelta time.Duration) rules.QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
// Enforce the max query length.
maxQueryLength := overrides.MaxQueryLength(userID)
Expand All @@ -192,35 +233,7 @@ func EngineQueryFunc(engine promql.QueryEngine, frontendClient *frontendClient,
}
ctx = requestmeta.ContextWithRequestSource(ctx, requestmeta.SourceRuler)

if frontendClient != nil {
v, err := frontendClient.InstantQuery(ctx, qs, t)
if err != nil {
return nil, err
}

return v, nil
} else {
q, err := engine.NewInstantQuery(ctx, q, nil, qs, t)
if err != nil {
return nil, err
}
res := q.Exec(ctx)
if res.Err != nil {
return nil, res.Err
}
switch v := res.Value.(type) {
case promql.Vector:
return v, nil
case promql.Scalar:
return promql.Vector{promql.Sample{
T: v.T,
F: v.V,
Metric: labels.Labels{},
}}, nil
default:
return nil, errors.New("rule result is not a vector or scalar")
}
}
return next(ctx, qs, t)
}
}

Expand Down Expand Up @@ -337,36 +350,25 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
// and errors returned by PromQL engine. Errors from Queryable can be either caused by user (limits) or internal errors.
// Errors from PromQL are always "user" errors.
q = querier.NewErrorTranslateQueryableWithFn(q, WrapQueryableErrors)

return func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, frontendPool *client.Pool, reg prometheus.Registerer) (RulesManager, error) {
var client *frontendClient
failedQueries := evalMetrics.FailedQueriesVec.WithLabelValues(userID)
totalQueries := evalMetrics.TotalQueriesVec.WithLabelValues(userID)
totalWrites := evalMetrics.TotalWritesVec.WithLabelValues(userID)
failedWrites := evalMetrics.FailedWritesVec.WithLabelValues(userID)

if cfg.FrontendAddress != "" {
c, err := frontendPool.GetClientFor(cfg.FrontendAddress)
if err != nil {
return nil, err
}
client = c.(*frontendClient)
qfeClient, err := resolveFrontendClient(cfg.FrontendAddress, frontendPool)
if err != nil {
return nil, err
}
var queryFunc rules.QueryFunc
engineQueryFunc := EngineQueryFunc(engine, client, q, overrides, userID, cfg.LookbackDelta)
metricsQueryFunc := MetricsQueryFunc(engineQueryFunc, totalQueries, failedQueries)
if cfg.EnableQueryStats {
queryFunc = RecordAndReportRuleQueryMetrics(metricsQueryFunc, userID, evalMetrics, logger)
} else {
queryFunc = metricsQueryFunc

if qfeClient == nil && engine == nil {
return nil, fmt.Errorf("neither engine nor frontend client is configured for user %s", userID)
}

queryFunc := buildQueryFunc(engine, qfeClient, q, overrides, userID, cfg, evalMetrics, logger)
// We let the Prometheus rules manager control the context so that there is a chance
// for graceful shutdown of rules that are still in execution even in case the cortex context is canceled.
prometheusContext := user.InjectOrgID(context.WithoutCancel(ctx), userID)

return rules.NewManager(&rules.ManagerOptions{
Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites),
Appendable: NewPusherAppendable(p, userID, overrides,
evalMetrics.TotalWritesVec.WithLabelValues(userID),
evalMetrics.FailedWritesVec.WithLabelValues(userID)),
Queryable: q,
QueryFunc: queryFunc,
Context: prometheusContext,
Expand All @@ -387,6 +389,41 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
}
}

func resolveFrontendClient(addr string, pool *client.Pool) (*frontendClient, error) {
if addr == "" {
return nil, nil
}
c, err := pool.GetClientFor(addr)
if err != nil {
return nil, err
}
return c.(*frontendClient), nil
}

func buildQueryFunc(
engine promql.QueryEngine,
client *frontendClient,
q storage.Queryable,
overrides RulesLimits,
userID string,
cfg Config,
metrics *RuleEvalMetrics,
logger log.Logger,
) rules.QueryFunc {
baseQueryFunc := EngineQueryFunc(engine, client, q, overrides, userID, cfg.LookbackDelta)

// apply metric middleware
totalQueries := metrics.TotalQueriesVec.WithLabelValues(userID)
failedQueries := metrics.FailedQueriesVec.WithLabelValues(userID)
metricsFunc := MetricsQueryFunc(baseQueryFunc, totalQueries, failedQueries)

// apply statistic middleware
if cfg.EnableQueryStats {
return RecordAndReportRuleQueryMetrics(metricsFunc, userID, metrics, logger)
}
return metricsFunc
}

type QueryableError struct {
err error
}
Expand Down
Loading