diff --git a/pkg/logql/downstream_test.go b/pkg/logql/downstream_test.go index ec5f3170468d0..0bd7f8b12197b 100644 --- a/pkg/logql/downstream_test.go +++ b/pkg/logql/downstream_test.go @@ -181,6 +181,70 @@ func TestMappingEquivalenceSketches(t *testing.T) { } } +func TestMappingEquivalenceSketches_Instant(t *testing.T) { + var ( + shards = 3 + nStreams = 10_000 + rounds = 20 + streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}, true) + start = time.Unix(0, int64(rounds+1)) + end = time.Unix(0, int64(rounds+1)) + step = time.Duration(0) + interval = time.Duration(0) + limit = 100 + ) + + for _, tc := range []struct { + query string + realtiveError float64 + }{ + {`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [10m]) by (a)`, 0.03}, + {`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [10m]) by (a)`, 0.02}, + } { + q := NewMockQuerier( + shards, + streams, + ) + + opts := EngineOpts{} + regular := NewEngine(opts, q, NoLimits, log.NewNopLogger()) + sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger()) + + t.Run(tc.query, func(t *testing.T) { + params, err := NewLiteralParams( + tc.query, + start, + end, + step, + interval, + logproto.FORWARD, + uint32(limit), + nil, + ) + require.NoError(t, err) + qry := regular.Query(params) + ctx := user.InjectOrgID(context.Background(), "fake") + + mapper := NewShardMapper(ConstantShards(shards), nilShardMetrics, []string{ShardQuantileOverTime}) + _, _, mapped, err := mapper.Parse(params.GetExpression()) + require.NoError(t, err) + + shardedQry := sharded.Query(ctx, ParamsWithExpressionOverride{ + Params: params, + ExpressionOverride: mapped, + }) + + res, err := qry.Exec(ctx) + require.NoError(t, err) + + shardedRes, err := shardedQry.Exec(ctx) + require.NoError(t, err) + + relativeErrorVector(t, res.Data.(promql.Vector), shardedRes.Data.(promql.Vector), tc.realtiveError) + }) + } +} + func TestShardCounter(t *testing.T) { var ( shards = 3 @@ -546,6 +610,21 @@ func relativeError(t *testing.T, expected, actual promql.Matrix, alpha float64) } } +func relativeErrorVector(t *testing.T, expected, actual promql.Vector, alpha float64) { + require.Len(t, actual, len(expected)) + + e := make([]float64, len(expected)) + a := make([]float64, len(expected)) + for i := 0; i < len(expected); i++ { + require.Equal(t, expected[i].Metric, actual[i].Metric) + + e[i] = expected[i].F + a[i] = expected[i].F + } + require.InEpsilonSlice(t, e, a, alpha) + +} + func TestFormat_ShardedExpr(t *testing.T) { oldMax := syntax.MaxCharsPerLine syntax.MaxCharsPerLine = 20 diff --git a/pkg/logql/quantile_over_time_sketch.go b/pkg/logql/quantile_over_time_sketch.go index 507c72b208ab8..b13f233a2cc2d 100644 --- a/pkg/logql/quantile_over_time_sketch.go +++ b/pkg/logql/quantile_over_time_sketch.go @@ -283,6 +283,11 @@ func JoinQuantileSketchVector(next bool, r StepResult, stepEvaluator StepEvaluat if stepEvaluator.Error() != nil { return nil, stepEvaluator.Error() } + // Don't continue to evaluate more steps if we already filled the + // # of steps we should have based on start/end and step params. + if len(result) == stepCount { + break + } } return result, stepEvaluator.Error()