Skip to content

Commit

Permalink
engine: allow to override opts at query time
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
Michael Hoffmann committed Dec 15, 2024
1 parent 2f49f80 commit fc40cf4
Showing 1 changed file with 32 additions and 43 deletions.
75 changes: 32 additions & 43 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ func (o Opts) getLogicalOptimizers() []logicalplan.Optimizer {
return optimizers
}

// QueryOpts implements promql.QueryOpts but allows to override more engine default options.
type QueryOpts struct {
promql.QueryOpts

// DecodingConcurrency can be used to override the DecodingConcurrency engine setting.
DecodingConcurrency int
}

// New creates a new query engine with the given options. The query engine will
// use the storage passed in NewInstantQuery and NewRangeQuery for retrieving
// data when executing queries.
Expand Down Expand Up @@ -239,31 +247,12 @@ func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts
if err != nil {
return nil, err
}

if opts == nil {
opts = promql.NewPrometheusQueryOpts(false, e.lookbackDelta)
}
if opts.LookbackDelta() <= 0 {
opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta)
}

// determine sorting order before optimizers run, we do this by looking for "sort"
// and "sort_desc" and optimize them away afterwards since they are only needed at
// the presentation layer and not when computing the results.
resultSort := newResultSort(expr)

qOpts := &query.Options{
Start: ts,
End: ts,
Step: 0,
StepsBatch: stepsBatch,
LookbackDelta: opts.LookbackDelta(),
EnablePerStepStats: e.enablePerStepStats && opts.EnablePerStepStats(),
ExtLookbackDelta: e.extLookbackDelta,
EnableAnalysis: e.enableAnalysis,
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
DecodingConcurrency: e.decodingConcurrency,
}
qOpts := e.makeQueryOpts(ts, ts, 0, opts)
if qOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
}
Expand Down Expand Up @@ -308,13 +297,6 @@ func (e *Engine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryabl
}
defer e.activeQueryTracker.Delete(idx)

if opts == nil {
opts = promql.NewPrometheusQueryOpts(false, e.lookbackDelta)
}
if opts.LookbackDelta() <= 0 {
opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta)
}

qOpts := e.makeQueryOpts(ts, ts, 0, opts)
if qOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
Expand Down Expand Up @@ -371,12 +353,6 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr
if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar {
return nil, errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
}
if opts == nil {
opts = promql.NewPrometheusQueryOpts(false, e.lookbackDelta)
}
if opts.LookbackDelta() <= 0 {
opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta)
}
qOpts := e.makeQueryOpts(start, end, step, opts)
if qOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
Expand Down Expand Up @@ -420,12 +396,6 @@ func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable,
}
defer e.activeQueryTracker.Delete(idx)

if opts == nil {
opts = promql.NewPrometheusQueryOpts(false, e.lookbackDelta)
}
if opts.LookbackDelta() <= 0 {
opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta)
}
qOpts := e.makeQueryOpts(start, end, step, opts)
if qOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
Expand Down Expand Up @@ -462,19 +432,38 @@ func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable,
}

func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duration, opts promql.QueryOpts) *query.Options {
qOpts := &query.Options{
res := &query.Options{
Start: start,
End: end,
Step: step,
StepsBatch: stepsBatch,
LookbackDelta: opts.LookbackDelta(),
EnablePerStepStats: e.enablePerStepStats && opts.EnablePerStepStats(),
LookbackDelta: e.lookbackDelta,
EnablePerStepStats: e.enablePerStepStats,
ExtLookbackDelta: e.extLookbackDelta,
EnableAnalysis: e.enableAnalysis,
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
DecodingConcurrency: e.decodingConcurrency,
}
return qOpts
if opts == nil {
return res
}

if opts.LookbackDelta() > 0 {
res.LookbackDelta = opts.LookbackDelta()
}
if opts.EnablePerStepStats() {
res.EnablePerStepStats = opts.EnablePerStepStats()
}

extOpts, ok := opts.(*QueryOpts)
if !ok {
return res
}

if extOpts.DecodingConcurrency != 0 {
res.DecodingConcurrency = extOpts.DecodingConcurrency
}
return res
}

func (e *Engine) storageScanners(queryable storage.Queryable, qOpts *query.Options, lplan logicalplan.Plan) (engstorage.Scanners, error) {
Expand Down

0 comments on commit fc40cf4

Please sign in to comment.