Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sample stats for step invariant and subquery #506

Merged
merged 3 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sort"
"time"

"github.com/thanos-io/promql-engine/execution/telemetry"

"github.com/efficientgo/core/errors"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -305,6 +307,9 @@ func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts
t: InstantQuery,
resultSort: resultSort,
scanners: scanners,
start: ts,
end: ts,
step: 0,
}, nil
}

Expand Down Expand Up @@ -352,6 +357,9 @@ func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryab
// TODO(fpetkovski): Infer the sort order from the plan, ideally without copying the newResultSort function.
resultSort: noSortResultSort{},
scanners: scnrs,
start: ts,
end: ts,
step: 0,
}, nil
}

Expand Down Expand Up @@ -404,6 +412,9 @@ func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *
warns: warns,
t: RangeQuery,
scanners: scnrs,
start: start,
end: end,
step: step,
}, nil
}

Expand Down Expand Up @@ -446,6 +457,9 @@ func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable
warns: warns,
t: RangeQuery,
scanners: scnrs,
start: start,
end: end,
step: step,
}, nil
}

Expand Down Expand Up @@ -522,7 +536,7 @@ func (q *Query) Explain() *ExplainOutputNode {
}

func (q *Query) Analyze() *AnalyzeOutputNode {
if observableRoot, ok := q.exec.(model.ObservableVectorOperator); ok {
if observableRoot, ok := q.exec.(telemetry.ObservableVectorOperator); ok {
return analyzeQuery(observableRoot)
}
return nil
Expand All @@ -534,6 +548,9 @@ type compatibilityQuery struct {
plan logicalplan.Plan
ts time.Time // Empty for range queries.
warns annotations.Annotations
start time.Time
end time.Time
step time.Duration

t QueryType
resultSort resultSorter
Expand Down Expand Up @@ -707,6 +724,10 @@ func (q *compatibilityQuery) Stats() *stats.Statistics {

analysis := q.Analyze()
samples := stats.NewQuerySamples(enablePerStepStats)
if enablePerStepStats {
samples.InitStepTracking(q.start.UnixMilli(), q.end.UnixMilli(), telemetry.StepTrackingInterval(q.step))
}

if analysis != nil {
samples.PeakSamples = int(analysis.PeakSamples())
samples.TotalSamples = analysis.TotalSamples()
Expand Down
71 changes: 67 additions & 4 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4814,6 +4814,47 @@ func TestQueryStats(t *testing.T) {
end: time.Unix(1800, 0),
step: time.Second * 30,
},
{
name: "step invariant with samples",
load: `load 5m
http_requests_total{pod="nginx-1"} 1+1x5
http_requests_total{pod="nginx-2"} 1+2x5`,
query: `sum without (__name__) (http_requests_total @ end())`,
start: time.Unix(1, 0),
end: time.Unix(600, 0),
step: time.Second * 34,
},
{
name: "step invariant without samples",
load: `load 30s
http_requests_total{pod="nginx-1"} 1.00+1.00x15
http_requests_total{pod="nginx-2"} 1+2.00x21`,
query: `pi()`,
start: time.UnixMilli(0),
end: time.UnixMilli(120000),
step: time.Second * 30,
},
{
name: "fuzz subquery without enough samples",
load: `load 30s
http_requests_total{pod="nginx-1"} 1.00+1.00x15
http_requests_total{pod="nginx-2"} 1+2.00x21`,
query: `rate({__name__="http_requests_total"} offset -6s[1h:1m] offset 1m29s)`,
start: time.UnixMilli(0),
end: time.UnixMilli(120000),
step: time.Second * 30,
},
// TODO (harry671003): This is a known case which needs to be fixed upstream.
//{
// name: "fuzz aggregation with scalar param",
// load: `load 30s
// http_requests_total{pod="nginx-1"} -77.00+1.00x15
// http_requests_total{pod="nginx-2"} 1+0.67x21`,
// query: `quantile without (pod) (scalar({__name__="http_requests_total"} offset 2m58s), {__name__="http_requests_total"})`,
// start: time.UnixMilli(0),
// end: time.UnixMilli(221000),
// step: time.Second * 30,
//},
}

for _, tc := range cases {
Expand All @@ -4825,6 +4866,8 @@ func TestQueryStats(t *testing.T) {
Timeout: 300 * time.Second,
MaxSamples: math.MaxInt64,
EnablePerStepStats: true,
EnableAtModifier: true,
EnableNegativeOffset: true,
NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 { return 30 * time.Second.Milliseconds() },
}
qOpts := promql.NewPrometheusQueryOpts(true, 5*time.Minute)
Expand All @@ -4851,8 +4894,9 @@ func TestQueryStats(t *testing.T) {
stats.NewQueryStats(newStats)

testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult)
testutil.Equals(t, oldStats.Samples.TotalSamples, newStats.Samples.TotalSamples)
testutil.Equals(t, oldStats.Samples.TotalSamplesPerStep, newStats.Samples.TotalSamplesPerStep)
if oldResult.Err == nil {
testutil.WithGoCmp(samplesComparer).Equals(t, oldStats.Samples, newStats.Samples)
}

// Range query
oldQ, err = oldEngine.NewRangeQuery(ctx, storage, qOpts, tc.query, tc.start, tc.end, tc.step)
Expand All @@ -4868,8 +4912,9 @@ func TestQueryStats(t *testing.T) {
stats.NewQueryStats(newStats)

testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult)
testutil.Equals(t, oldStats.Samples.TotalSamples, newStats.Samples.TotalSamples)
testutil.Equals(t, oldStats.Samples.TotalSamplesPerStep, newStats.Samples.TotalSamplesPerStep)
if oldResult.Err == nil {
testutil.WithGoCmp(samplesComparer).Equals(t, oldStats.Samples, newStats.Samples)
}
})
}
}
Expand Down Expand Up @@ -5727,6 +5772,24 @@ var (
}
return false
})

samplesComparer = cmp.Comparer(func(x, y *stats.QuerySamples) bool {
if x == nil && y == nil {
return true
}
if x.TotalSamples != y.TotalSamples {
return false
}

if !cmp.Equal(x.TotalSamplesPerStep, y.TotalSamplesPerStep) {
return false
}

if !cmp.Equal(x.TotalSamplesPerStepMap(), y.TotalSamplesPerStepMap()) {
return false
}
return true
})
)

func queryExplanation(q promql.Query) string {
Expand Down
113 changes: 88 additions & 25 deletions engine/enginefuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/promqltest"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/stats"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/stretchr/testify/require"

Expand All @@ -32,9 +33,31 @@ import (
const testRuns = 100

type testCase struct {
query string
loads []string
oldRes, newRes *promql.Result
query string
loads []string
oldRes, newRes *promql.Result
oldStats, newStats *stats.Statistics
start, end time.Time
interval time.Duration
validateSamples bool
}

// shouldValidateSamples checks if the samples can be compared for the expr.
// For certain known cases, prometheus engine and thanos engine returns different samples.
func shouldValidateSamples(expr parser.Expr) bool {
valid := true

parser.Inspect(expr, func(node parser.Node, path []parser.Node) error {
switch n := node.(type) {
case *parser.Call:
if n.Func.Name == "scalar" {
valid = false
return errors.New("error")
}
}
return nil
})
return valid
}

func FuzzEnginePromQLSmithRangeQuery(f *testing.F) {
Expand All @@ -61,7 +84,9 @@ func FuzzEnginePromQLSmithRangeQuery(f *testing.F) {
MaxSamples: 1e10,
EnableNegativeOffset: true,
EnableAtModifier: true,
EnablePerStepStats: true,
}
qOpts := promql.NewPrometheusQueryOpts(true, 0)

storage := promqltest.LoadedStorage(t, load)
defer storage.Close()
Expand All @@ -80,22 +105,22 @@ func FuzzEnginePromQLSmithRangeQuery(f *testing.F) {
}
ps := promqlsmith.New(rnd, seriesSet, psOpts...)

newEngine := engine.New(engine.Opts{EngineOpts: opts, DisableFallback: true})
newEngine := engine.New(engine.Opts{EngineOpts: opts, DisableFallback: true, EnableAnalysis: true})
oldEngine := promql.NewEngine(opts)

var (
q1 promql.Query
query string
q1 promql.Query
query string
validateSamples bool
)
cases := make([]*testCase, testRuns)
for i := 0; i < testRuns; i++ {
// Since we disabled fallback, keep trying until we find a query
// that can be natively executed by the engine.
// Parsing experimental function, like mad_over_time, will lead to a parser.ParseErrors, so we also ignore those.
for {
expr := ps.WalkRangeQuery()
validateSamples = shouldValidateSamples(expr)

query = expr.Pretty(0)
q1, err = newEngine.NewRangeQuery(context.Background(), storage, nil, query, start, end, interval)
q1, err = newEngine.NewRangeQuery(context.Background(), storage, qOpts, query, start, end, interval)
if errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented) || errors.As(err, &parser.ParseErrors{}) {
continue
} else {
Expand All @@ -105,17 +130,27 @@ func FuzzEnginePromQLSmithRangeQuery(f *testing.F) {

testutil.Ok(t, err)
newResult := q1.Exec(context.Background())
newStats := q1.Stats()
stats.NewQueryStats(newStats)

q2, err := oldEngine.NewRangeQuery(context.Background(), storage, nil, query, start, end, interval)
q2, err := oldEngine.NewRangeQuery(context.Background(), storage, qOpts, query, start, end, interval)
testutil.Ok(t, err)

oldResult := q2.Exec(context.Background())
oldStats := q2.Stats()
stats.NewQueryStats(oldStats)

cases[i] = &testCase{
query: query,
newRes: newResult,
oldRes: oldResult,
loads: []string{load},
query: query,
newRes: newResult,
newStats: newStats,
oldRes: oldResult,
oldStats: oldStats,
loads: []string{load},
start: start,
end: end,
interval: interval,
validateSamples: validateSamples,
}
}
validateTestCases(t, cases)
Expand All @@ -141,7 +176,9 @@ func FuzzEnginePromQLSmithInstantQuery(f *testing.F) {
MaxSamples: 1e10,
EnableNegativeOffset: true,
EnableAtModifier: true,
EnablePerStepStats: true,
}
qOpts := promql.NewPrometheusQueryOpts(true, 0)

storage := promqltest.LoadedStorage(t, load)
defer storage.Close()
Expand All @@ -151,6 +188,7 @@ func FuzzEnginePromQLSmithInstantQuery(f *testing.F) {
EngineOpts: opts,
DisableFallback: true,
LogicalOptimizers: logicalplan.AllOptimizers,
EnableAnalysis: true,
})
oldEngine := promql.NewEngine(opts)

Expand All @@ -176,8 +214,11 @@ func FuzzEnginePromQLSmithInstantQuery(f *testing.F) {
// Parsing experimental function, like mad_over_time, will lead to a parser.ParseErrors, so we also ignore those.
for {
expr := ps.WalkInstantQuery()
if !shouldValidateSamples(expr) {
continue
}
query = expr.Pretty(0)
q1, err = newEngine.NewInstantQuery(context.Background(), storage, nil, query, queryTime)
q1, err = newEngine.NewInstantQuery(context.Background(), storage, qOpts, query, queryTime)
if errors.Is(err, parse.ErrNotSupportedExpr) || errors.Is(err, parse.ErrNotImplemented) || errors.As(err, &parser.ParseErrors{}) {
continue
} else {
Expand All @@ -187,17 +228,26 @@ func FuzzEnginePromQLSmithInstantQuery(f *testing.F) {

testutil.Ok(t, err)
newResult := q1.Exec(context.Background())
newStats := q1.Stats()
stats.NewQueryStats(newStats)

q2, err := oldEngine.NewInstantQuery(context.Background(), storage, nil, query, queryTime)
q2, err := oldEngine.NewInstantQuery(context.Background(), storage, qOpts, query, queryTime)
testutil.Ok(t, err)

oldResult := q2.Exec(context.Background())
oldStats := q2.Stats()
stats.NewQueryStats(oldStats)

cases[i] = &testCase{
query: query,
newRes: newResult,
oldRes: oldResult,
loads: []string{load},
query: query,
newRes: newResult,
newStats: newStats,
oldRes: oldResult,
oldStats: oldStats,
loads: []string{load},
start: queryTime,
end: queryTime,
validateSamples: true,
}
}
validateTestCases(t, cases)
Expand Down Expand Up @@ -444,14 +494,27 @@ func getSeries(ctx context.Context, q storage.Queryable) ([]labels.Labels, error

func validateTestCases(t *testing.T, cases []*testCase) {
failures := 0
logQuery := func(c *testCase) {
for _, load := range c.loads {
t.Logf(load)
}
t.Logf("query: %s, start: %d, end: %d, interval: %v", c.query, c.start.UnixMilli(), c.end.UnixMilli(), c.interval)
}
for i, c := range cases {
if !cmp.Equal(c.oldRes, c.newRes, comparer) {
for _, load := range c.loads {
t.Logf(load)
}
t.Logf(c.query)
logQuery(c)

t.Logf("case %d error mismatch.\nnew result: %s\nold result: %s\n", i, c.newRes.String(), c.oldRes.String())
//failures++
continue
}
if !c.validateSamples || c.oldRes.Err != nil {
// Skip sample comparison
continue
}
if !cmp.Equal(c.oldStats.Samples, c.newStats.Samples, samplesComparer) {
logQuery(c)
t.Logf("case: %d, samples mismatch. total samples: old: %v, new: %v. samples per step: old: %v, new: %v", i, c.oldStats.Samples.TotalSamples, c.newStats.Samples.TotalSamples, c.oldStats.Samples.TotalSamplesPerStep, c.newStats.Samples.TotalSamplesPerStep)
failures++
}
}
Expand Down
Loading
Loading