Skip to content

Commit

Permalink
Logql benchmark and performance improvement. (#1371)
Browse files Browse the repository at this point in the history
* Add logql benchmark test.

Signed-off-by: Cyril Tovena <[email protected]>

* Fixes store benchmark limits.

Signed-off-by: Cyril Tovena <[email protected]>

* Add LogQL improvements.

Signed-off-by: Cyril Tovena <[email protected]>

* Add back logging in the storage benchmark

Signed-off-by: Cyril Tovena <[email protected]>
  • Loading branch information
cyriltovena authored Dec 9, 2019
1 parent ea5eec2 commit 1fc0ffd
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 23 deletions.
17 changes: 8 additions & 9 deletions pkg/iter/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,26 +602,27 @@ type PeekingEntryIterator interface {
func NewPeekingIterator(iter EntryIterator) PeekingEntryIterator {
// initialize the next entry so we can peek right from the start.
var cache *entryWithLabels
next := &entryWithLabels{}
if iter.Next() {
cache = &entryWithLabels{
entry: iter.Entry(),
labels: iter.Labels(),
}
next.entry = cache.entry
next.labels = cache.labels
}
return &peekingEntryIterator{
iter: iter,
cache: cache,
next: cache,
next: next,
}
}

// Next implements `EntryIterator`
func (it *peekingEntryIterator) Next() bool {
if it.cache != nil {
it.next = &entryWithLabels{
entry: it.cache.entry,
labels: it.cache.labels,
}
it.next.entry = it.cache.entry
it.next.labels = it.cache.labels
it.cacheNext()
return true
}
Expand All @@ -631,10 +632,8 @@ func (it *peekingEntryIterator) Next() bool {
// cacheNext caches the next element if it exists.
func (it *peekingEntryIterator) cacheNext() {
if it.iter.Next() {
it.cache = &entryWithLabels{
entry: it.iter.Entry(),
labels: it.iter.Labels(),
}
it.cache.entry = it.iter.Entry()
it.cache.labels = it.iter.Labels()
return
}
// nothing left removes the cached entry
Expand Down
3 changes: 2 additions & 1 deletion pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,9 @@ type groupedAggregation struct {
// Evaluator implements `SampleExpr` for a vectorAggregationExpr
// this is copied and adapted from Prometheus vector aggregation code.
func (v *vectorAggregationExpr) Evaluator() StepEvaluator {
nextEvaluator := v.left.Evaluator()
return StepEvaluatorFn(func() (bool, int64, promql.Vector) {
next, ts, vec := v.left.Evaluator().Next()
next, ts, vec := nextEvaluator.Next()
if !next {
return false, 0, promql.Vector{}
}
Expand Down
80 changes: 80 additions & 0 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,86 @@ func TestEngine_NewRangeQuery(t *testing.T) {
}
}

// go test -mod=vendor ./pkg/logql/ -bench=. -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out
func BenchmarkRangeQuery100000(b *testing.B) {
benchmarkRangeQuery(int64(100000), b)
}
func BenchmarkRangeQuery200000(b *testing.B) {
benchmarkRangeQuery(int64(200000), b)
}
func BenchmarkRangeQuery500000(b *testing.B) {
benchmarkRangeQuery(int64(500000), b)
}

func BenchmarkRangeQuery1000000(b *testing.B) {
benchmarkRangeQuery(int64(1000000), b)
}

var result promql.Value

func benchmarkRangeQuery(testsize int64, b *testing.B) {
b.ReportAllocs()
eng := NewEngine(EngineOpts{})
start := time.Unix(0, 0)
end := time.Unix(testsize, 0)
querier := getLocalQuerier(testsize)
b.ResetTimer()
for i := 0; i < b.N; i++ {
for _, test := range []struct {
qs string
direction logproto.Direction
}{
{`{app="foo"}`, logproto.FORWARD},
{`{app="bar"} |= "foo" |~ ".+bar"`, logproto.BACKWARD},
{`rate({app="foo"} |~".+bar" [1m])`, logproto.BACKWARD},
{`rate({app="foo"}[30s])`, logproto.FORWARD},
{`count_over_time({app="foo"} |~".+bar" [1m])`, logproto.BACKWARD},
{`count_over_time(({app="foo"} |~".+bar")[5m])`, logproto.BACKWARD},
{`avg(count_over_time({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD},
{`min(rate({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD},
{`max by (app) (rate({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD},
{`max(rate({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD},
{`sum(rate({app=~"foo|bar"} |~".+bar" [1m]))`, logproto.FORWARD},
{`sum(count_over_time({app=~"foo|bar"} |~".+bar" [1m])) by (app)`, logproto.FORWARD},
{`count(count_over_time({app=~"foo|bar"} |~".+bar" [1m])) without (app)`, logproto.FORWARD},
{`stdvar without (app) (count_over_time(({app=~"foo|bar"} |~".+bar")[1m])) `, logproto.FORWARD},
{`stddev(count_over_time(({app=~"foo|bar"} |~".+bar")[1m])) `, logproto.FORWARD},
{`rate(({app=~"foo|bar"} |~".+bar")[1m])`, logproto.FORWARD},
{`topk(2,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, logproto.FORWARD},
{`topk(1,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, logproto.FORWARD},
{`topk(1,rate(({app=~"foo|bar"} |~".+bar")[1m])) by (app)`, logproto.FORWARD},
{`bottomk(2,rate(({app=~"foo|bar"} |~".+bar")[1m]))`, logproto.FORWARD},
{`bottomk(3,rate(({app=~"foo|bar"} |~".+bar")[1m])) without (app)`, logproto.FORWARD},
} {
q := eng.NewRangeQuery(querier, test.qs, start, end, 60*time.Second, test.direction, 1000)
res, err := q.Exec(context.Background())
if err != nil {
b.Fatal(err)
}
result = res
if res == nil {
b.Fatal("unexpected nil result")
}
}
}
}

func getLocalQuerier(size int64) Querier {
iters := []iter.EntryIterator{
iter.NewStreamIterator(newStream(size, identity, `{app="foo"}`)),
iter.NewStreamIterator(newStream(size, identity, `{app="foo",bar="foo"}`)),
iter.NewStreamIterator(newStream(size, identity, `{app="foo",bar="bazz"}`)),
iter.NewStreamIterator(newStream(size, identity, `{app="foo",bar="fuzz"}`)),
iter.NewStreamIterator(newStream(size, identity, `{app="bar"}`)),
iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="foo"}`)),
iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="bazz"}`)),
iter.NewStreamIterator(newStream(size, identity, `{app="bar",bar="fuzz"}`)),
}
return QuerierFunc(func(ctx context.Context, p SelectParams) (iter.EntryIterator, error) {
return iter.NewHeapIterator(iters, p.Direction), nil
})
}

type querierRecorder struct {
source map[string][]*logproto.Stream
}
Expand Down
52 changes: 40 additions & 12 deletions pkg/logql/range_vector.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package logql

import (
"sync"

"github.com/grafana/loki/pkg/iter"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
)

Expand All @@ -22,6 +25,7 @@ type rangeVectorIterator struct {
iter iter.PeekingEntryIterator
selRange, step, end, current int64
window map[string]*promql.Series
metrics map[string]labels.Labels
}

func newRangeVectorIterator(
Expand All @@ -38,6 +42,7 @@ func newRangeVectorIterator(
selRange: selRange,
current: start - step, // first loop iteration will set it to start
window: map[string]*promql.Series{},
metrics: map[string]labels.Labels{},
}
}

Expand Down Expand Up @@ -73,7 +78,9 @@ func (r *rangeVectorIterator) popBack(newStart int64) {
}
r.window[fp].Points = r.window[fp].Points[lastPoint+1:]
if len(r.window[fp].Points) == 0 {
s := r.window[fp]
delete(r.window, fp)
putSeries(s)
}
}
}
Expand All @@ -95,15 +102,25 @@ func (r *rangeVectorIterator) load(start, end int64) {
var ok bool
series, ok = r.window[lbs]
if !ok {
series = &promql.Series{
Points: []promql.Point{},
var metric labels.Labels
if metric, ok = r.metrics[lbs]; !ok {
var err error
metric, err = promql.ParseMetric(lbs)
if err != nil {
continue
}
r.metrics[lbs] = metric
}

series = getSeries()
series.Metric = metric
r.window[lbs] = series
}
series.Points = append(series.Points, promql.Point{
p := promql.Point{
T: entry.Timestamp.UnixNano(),
V: 1,
})
}
series.Points = append(series.Points, p)
_ = r.iter.Next()
}
}
Expand All @@ -112,20 +129,31 @@ func (r *rangeVectorIterator) At(aggregator RangeVectorAggregator) (int64, promq
result := make([]promql.Sample, 0, len(r.window))
// convert ts from nano to milli seconds as the iterator work with nanoseconds
ts := r.current / 1e+6
for lbs, series := range r.window {
labels, err := promql.ParseMetric(lbs)
if err != nil {
continue
}

for _, series := range r.window {
result = append(result, promql.Sample{
Point: promql.Point{
V: aggregator(ts, series.Points),
T: ts,
},
Metric: labels,
Metric: series.Metric,
})

}
return ts, result
}

var seriesPool sync.Pool

func getSeries() *promql.Series {
if r := seriesPool.Get(); r != nil {
s := r.(*promql.Series)
s.Points = s.Points[:0]
return s
}
return &promql.Series{
Points: make([]promql.Point, 0, 1024),
}
}

func putSeries(s *promql.Series) {
seriesPool.Put(s)
}
8 changes: 7 additions & 1 deletion pkg/storage/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ func printHeap(b *testing.B, show bool) {
}

func getLocalStore() Store {
limits, err := validation.NewOverrides(validation.Limits{
MaxQueryLength: 6000 * time.Hour,
})
if err != nil {
panic(err)
}
store, err := NewStore(Config{
Config: storage.Config{
BoltDBConfig: local.BoltDBConfig{Directory: "/tmp/benchmark/index"},
Expand All @@ -158,7 +164,7 @@ func getLocalStore() Store {
},
},
},
}, &validation.Overrides{})
}, limits)
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 1fc0ffd

Please sign in to comment.