Skip to content

Commit

Permalink
engine: add more constructors
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
Michael Hoffmann committed Dec 16, 2024
1 parent fc40cf4 commit 50340b9
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 30 deletions.
30 changes: 19 additions & 11 deletions engine/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,37 +69,45 @@ func NewDistributedEngine(opts Opts, endpoints api.RemoteEndpoints) *Distributed
func (l DistributedEngine) SetQueryLogger(log promql.QueryLogger) {}

func (l DistributedEngine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
return l.MakeInstantQuery(ctx, q, fromPromQLOpts(opts), qs, ts)
}

func (l DistributedEngine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
return l.MakeRangeQuery(ctx, q, fromPromQLOpts(opts), qs, start, end, interval)
}

func (l DistributedEngine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) {
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
// Some clients might only support second precision when executing queries.
ts = ts.Truncate(time.Second)

return l.remoteEngine.NewInstantQuery(ctx, q, opts, qs, ts)
return l.remoteEngine.MakeInstantQueryFromPlan(ctx, q, opts, plan, ts)
}

func (l DistributedEngine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
func (l DistributedEngine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, plan logicalplan.Node, start, end time.Time, interval time.Duration) (promql.Query, error) {
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
// Some clients might only support second precision when executing queries.
start = start.Truncate(time.Second)
end = end.Truncate(time.Second)
interval = interval.Truncate(time.Second)

return l.remoteEngine.NewRangeQuery(ctx, q, opts, qs, start, end, interval)
return l.remoteEngine.MakeRangeQueryFromPlan(ctx, q, opts, plan, start, end, interval)
}

func (l DistributedEngine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, plan logicalplan.Node, start, end time.Time, interval time.Duration) (promql.Query, error) {
func (l DistributedEngine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) {
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
// Some clients might only support second precision when executing queries.
start = start.Truncate(time.Second)
end = end.Truncate(time.Second)
interval = interval.Truncate(time.Second)
ts = ts.Truncate(time.Second)

return l.remoteEngine.NewRangeQueryFromPlan(ctx, q, opts, plan, start, end, interval)
return l.remoteEngine.MakeInstantQuery(ctx, q, opts, qs, ts)
}

func (l DistributedEngine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) {
func (l DistributedEngine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
// Some clients might only support second precision when executing queries.
ts = ts.Truncate(time.Second)
start = start.Truncate(time.Second)
end = end.Truncate(time.Second)
interval = interval.Truncate(time.Second)

return l.remoteEngine.NewInstantQueryFromPlan(ctx, q, opts, plan, ts)
return l.remoteEngine.MakeRangeQuery(ctx, q, opts, qs, start, end, interval)
}
54 changes: 35 additions & 19 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,27 @@ func (o Opts) getLogicalOptimizers() []logicalplan.Optimizer {

// QueryOpts implements promql.QueryOpts but allows to override more engine default options.
type QueryOpts struct {
promql.QueryOpts
lookbackDelta time.Duration

enablePerStepStats bool

// DecodingConcurrency can be used to override the DecodingConcurrency engine setting.
DecodingConcurrency int
}

func (opts QueryOpts) LookbackDelta() time.Duration { return opts.lookbackDelta }
func (opts QueryOpts) EnablePerStepStats() bool { return opts.enablePerStepStats }

func fromPromQLOpts(opts promql.QueryOpts) *QueryOpts {
if opts == nil {
return &QueryOpts{}
}
return &QueryOpts{
lookbackDelta: opts.LookbackDelta(),
enablePerStepStats: opts.EnablePerStepStats(),
}
}

// New creates a new query engine with the given options. The query engine will
// use the storage passed in NewInstantQuery and NewRangeQuery for retrieving
// data when executing queries.
Expand Down Expand Up @@ -236,13 +251,7 @@ type Engine struct {
noStepSubqueryIntervalFn func(time.Duration) time.Duration
}

func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
idx, err := e.activeQueryTracker.Insert(ctx, qs)
if err != nil {
return nil, err
}
defer e.activeQueryTracker.Delete(idx)

func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) {
expr, err := parser.NewParser(qs, parser.WithFunctions(e.functions)).ParseExpr()
if err != nil {
return nil, err
Expand Down Expand Up @@ -290,7 +299,7 @@ func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts
}, nil
}

func (e *Engine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, ts time.Time) (promql.Query, error) {
func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, root logicalplan.Node, ts time.Time) (promql.Query, error) {
idx, err := e.activeQueryTracker.Insert(ctx, root.String())
if err != nil {
return nil, err
Expand Down Expand Up @@ -337,7 +346,7 @@ func (e *Engine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryabl
}, nil
}

func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) {
func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) {
idx, err := e.activeQueryTracker.Insert(ctx, qs)
if err != nil {
return nil, err
Expand Down Expand Up @@ -389,7 +398,7 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr
}, nil
}

func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) {
func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) {
idx, err := e.activeQueryTracker.Insert(ctx, root.String())
if err != nil {
return nil, err
Expand Down Expand Up @@ -431,7 +440,19 @@ func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable,
}, nil
}

func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duration, opts promql.QueryOpts) *query.Options {
// PromQL compatibility constructors

// NewInstantQuery implements the promql.Engine interface.
func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
return e.MakeInstantQuery(ctx, q, fromPromQLOpts(opts), qs, ts)
}

// NewRangeQuery implements the promql.Engine interface.
func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) {
return e.MakeRangeQuery(ctx, q, fromPromQLOpts(opts), qs, start, end, step)
}

func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duration, opts *QueryOpts) *query.Options {
res := &query.Options{
Start: start,
End: end,
Expand All @@ -455,13 +476,8 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
res.EnablePerStepStats = opts.EnablePerStepStats()
}

extOpts, ok := opts.(*QueryOpts)
if !ok {
return res
}

if extOpts.DecodingConcurrency != 0 {
res.DecodingConcurrency = extOpts.DecodingConcurrency
if opts.DecodingConcurrency != 0 {
res.DecodingConcurrency = opts.DecodingConcurrency
}
return res
}
Expand Down

0 comments on commit 50340b9

Please sign in to comment.