Skip to content

Commit

Permalink
Add batched streaming aggregations (thanos-io#324)
Browse files Browse the repository at this point in the history
* Add batched streaming aggregations

With the current model we expect each Next call to return samples
for unique steps. This approach works well because of its simplicity,
but for high cardinality queries (100K+ series), it tends to use a lot
of memory because the buffers for each step tend to be big.

This commit resolves that by allowing the aggregate to handle batches
from the same step coming from subsequent Next calls. Selectors are expanded
with a batchSize parameter which can be injected when a streaming aggregate is
present in the plan. Using this parameter then can put an upper limit on
the size of the output vectors they produce.

Signed-off-by: Filip Petkovski <[email protected]>

* Remove unused method

Signed-off-by: Filip Petkovski <[email protected]>

* Fix traverse

Signed-off-by: Filip Petkovski <[email protected]>

* Fix acceptance tests

Signed-off-by: Filip Petkovski <[email protected]>

* Add group test and fix failure

Signed-off-by: Filip Petkovski <[email protected]>

---------

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski authored Nov 19, 2023
1 parent fcf8381 commit 998354b
Show file tree
Hide file tree
Showing 19 changed files with 675 additions and 294 deletions.
46 changes: 31 additions & 15 deletions engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package engine_test
import (
"context"
"fmt"
"runtime"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -86,6 +87,10 @@ func BenchmarkChunkDecoding(b *testing.B) {
}

func BenchmarkSingleQuery(b *testing.B) {
b.StopTimer()
memProfileRate := runtime.MemProfileRate
runtime.MemProfileRate = 0

test := setupStorage(b, 5000, 3, 720)
defer test.Close()

Expand All @@ -94,10 +99,16 @@ func BenchmarkSingleQuery(b *testing.B) {
step := time.Second * 30

query := "sum(rate(http_requests_total[2m]))"
b.ResetTimer()
opts := engine.Opts{
EngineOpts: promql.EngineOpts{Timeout: 100 * time.Second},
DisableFallback: true,
SelectorBatchSize: 256,
}
b.ReportAllocs()
b.StartTimer()
runtime.MemProfileRate = memProfileRate
for i := 0; i < b.N; i++ {
result := executeRangeQuery(b, query, test, start, end, step)
result := executeRangeQuery(b, query, test, start, end, step, opts)
testutil.Ok(b, result.Err)
}
}
Expand Down Expand Up @@ -274,24 +285,29 @@ func BenchmarkRangeQuery(b *testing.B) {
},
}

opts := engine.Opts{
EngineOpts: promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 50000000,
Timeout: 100 * time.Second,
EnableAtModifier: true,
EnableNegativeOffset: true,
},
SelectorBatchSize: 256,
}

for _, tc := range cases {
b.Run(tc.name, func(b *testing.B) {
b.ReportAllocs()
b.Run("old_engine", func(b *testing.B) {
opts := promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 50000000,
Timeout: 100 * time.Second,
EnableAtModifier: true,
EnableNegativeOffset: true,
}
engine := promql.NewEngine(opts)

promEngine := promql.NewEngine(opts.EngineOpts)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
qry, err := engine.NewRangeQuery(context.Background(), tc.storage, nil, tc.query, start, end, step)
qry, err := promEngine.NewRangeQuery(context.Background(), tc.storage, nil, tc.query, start, end, step)
testutil.Ok(b, err)

oldResult := qry.Exec(context.Background())
Expand All @@ -303,7 +319,7 @@ func BenchmarkRangeQuery(b *testing.B) {
b.ReportAllocs()

for i := 0; i < b.N; i++ {
newResult := executeRangeQuery(b, tc.query, tc.storage, start, end, step)
newResult := executeRangeQuery(b, tc.query, tc.storage, start, end, step, opts)
testutil.Ok(b, newResult.Err)
}
})
Expand Down Expand Up @@ -562,8 +578,8 @@ func BenchmarkMergeSelectorsOptimizer(b *testing.B) {

}

func executeRangeQuery(b *testing.B, q string, storage *teststorage.TestStorage, start time.Time, end time.Time, step time.Duration) *promql.Result {
return executeRangeQueryWithOpts(b, q, storage, start, end, step, engine.Opts{DisableFallback: true, EngineOpts: promql.EngineOpts{Timeout: 100 * time.Second}})
func executeRangeQuery(b *testing.B, q string, storage *teststorage.TestStorage, start time.Time, end time.Time, step time.Duration, opts engine.Opts) *promql.Result {
return executeRangeQueryWithOpts(b, q, storage, start, end, step, opts)
}

func executeRangeQueryWithOpts(b *testing.B, q string, storage *teststorage.TestStorage, start time.Time, end time.Time, step time.Duration, opts engine.Opts) *promql.Result {
Expand Down
9 changes: 9 additions & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ type Opts struct {

// EnableAnalysis enables query analysis.
EnableAnalysis bool

// SelectorBatchSize specifies the maximum number of samples to be returned by selectors in a single batch.
SelectorBatchSize int64
}

func (o Opts) getLogicalOptimizers() []logicalplan.Optimizer {
Expand Down Expand Up @@ -180,6 +183,12 @@ func New(opts Opts) *compatibilityEngine {
opts.ExtLookbackDelta = 1 * time.Hour
level.Debug(opts.Logger).Log("msg", "externallookback delta is zero, setting to default value", "value", 1*24*time.Hour)
}
if opts.SelectorBatchSize != 0 {
opts.LogicalOptimizers = append(
[]logicalplan.Optimizer{logicalplan.SelectorBatchSize{Size: opts.SelectorBatchSize}},
opts.LogicalOptimizers...,
)
}

functions := make(map[string]*parser.Function, len(parser.Functions))
for k, v := range parser.Functions {
Expand Down
17 changes: 13 additions & 4 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1103,10 +1103,17 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
{
name: "group",
load: `load 30s
http_requests_total{pod="nginx-1"} 1+1x15
http_requests_total{pod="nginx-2"} 1+2x18`,
http_requests_total{pod="nginx-1"} 2+1x15
http_requests_total{pod="nginx-2"} 2+2x18`,
query: `group(http_requests_total)`,
},
{
name: "group by ",
load: `load 30s
http_requests_total{pod="nginx-1"} 2+1x15
http_requests_total{pod="nginx-2"} 2+2x18`,
query: `group by (pod) (http_requests_total)`,
},
{
name: "resets",
load: `load 30s
Expand Down Expand Up @@ -1907,6 +1914,8 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
EngineOpts: opts,
DisableFallback: disableFallback,
LogicalOptimizers: optimizers,
// Set to 1 to make sure batching is tested.
SelectorBatchSize: 1,
})
ctx := context.Background()
q1, err := newEngine.NewRangeQuery(ctx, storage, nil, tc.query, tc.start, tc.end, tc.step)
Expand All @@ -1920,7 +1929,7 @@ func TestQueriesAgainstOldEngine(t *testing.T) {
defer q2.Close()
oldResult := q2.Exec(ctx)

testutil.WithGoCmp(comparer).Equals(t, newResult, oldResult)
testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult)
})
}
})
Expand Down Expand Up @@ -4667,7 +4676,7 @@ func testNativeHistograms(t *testing.T, cases []histogramTestCase, opts promql.E
testutil.Assert(t, len(promVector) == 0)
}

testutil.WithGoCmp(comparer).Equals(t, newResult, promResult)
testutil.WithGoCmp(comparer).Equals(t, promResult, newResult)
})

t.Run("range", func(t *testing.T) {
Expand Down
Loading

0 comments on commit 998354b

Please sign in to comment.