Skip to content

Commit

Permalink
fix bug in quantile over time sharding that could cause us to
Browse files Browse the repository at this point in the history
evaluate more steps than we needed to, especially in the case of instant
queries!

Signed-off-by: Callum Styan <[email protected]>
  • Loading branch information
cstyan committed Feb 17, 2024
1 parent 23b1e38 commit fbdb52a
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 0 deletions.
79 changes: 79 additions & 0 deletions pkg/logql/downstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,70 @@ func TestMappingEquivalenceSketches(t *testing.T) {
}
}

func TestMappingEquivalenceSketches_Instant(t *testing.T) {
var (
shards = 3
nStreams = 10_000
rounds = 20
streams = randomStreams(nStreams, rounds+1, shards, []string{"a", "b", "c", "d"}, true)
start = time.Unix(0, int64(rounds+1))
end = time.Unix(0, int64(rounds+1))
step = time.Duration(0)
interval = time.Duration(0)
limit = 100
)

for _, tc := range []struct {
query string
realtiveError float64
}{
{`quantile_over_time(0.70, {a=~".+"} | logfmt | unwrap value [10m]) by (a)`, 0.03},
{`quantile_over_time(0.99, {a=~".+"} | logfmt | unwrap value [10m]) by (a)`, 0.02},
} {
q := NewMockQuerier(
shards,
streams,
)

opts := EngineOpts{}
regular := NewEngine(opts, q, NoLimits, log.NewNopLogger())
sharded := NewDownstreamEngine(opts, MockDownstreamer{regular}, NoLimits, log.NewNopLogger())

t.Run(tc.query, func(t *testing.T) {
params, err := NewLiteralParams(
tc.query,
start,
end,
step,
interval,
logproto.FORWARD,
uint32(limit),
nil,
)
require.NoError(t, err)
qry := regular.Query(params)
ctx := user.InjectOrgID(context.Background(), "fake")

mapper := NewShardMapper(ConstantShards(shards), nilShardMetrics, []string{ShardQuantileOverTime})
_, _, mapped, err := mapper.Parse(params.GetExpression())
require.NoError(t, err)

shardedQry := sharded.Query(ctx, ParamsWithExpressionOverride{
Params: params,
ExpressionOverride: mapped,
})

res, err := qry.Exec(ctx)
require.NoError(t, err)

shardedRes, err := shardedQry.Exec(ctx)
require.NoError(t, err)

relativeErrorVector(t, res.Data.(promql.Vector), shardedRes.Data.(promql.Vector), tc.realtiveError)
})
}
}

func TestShardCounter(t *testing.T) {
var (
shards = 3
Expand Down Expand Up @@ -546,6 +610,21 @@ func relativeError(t *testing.T, expected, actual promql.Matrix, alpha float64)
}
}

func relativeErrorVector(t *testing.T, expected, actual promql.Vector, alpha float64) {
require.Len(t, actual, len(expected))

e := make([]float64, len(expected))
a := make([]float64, len(expected))
for i := 0; i < len(expected); i++ {
require.Equal(t, expected[i].Metric, actual[i].Metric)

e[i] = expected[i].F
a[i] = expected[i].F
}
require.InEpsilonSlice(t, e, a, alpha)

}

func TestFormat_ShardedExpr(t *testing.T) {
oldMax := syntax.MaxCharsPerLine
syntax.MaxCharsPerLine = 20
Expand Down
5 changes: 5 additions & 0 deletions pkg/logql/quantile_over_time_sketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ func JoinQuantileSketchVector(next bool, r StepResult, stepEvaluator StepEvaluat
if stepEvaluator.Error() != nil {
return nil, stepEvaluator.Error()
}
// Don't continue to evaluate more steps if we already filled the
// # of steps we should have based on start/end and step params.
if len(result) == stepCount {
break
}
}

return result, stepEvaluator.Error()
Expand Down

0 comments on commit fbdb52a

Please sign in to comment.