Skip to content

Commit

Permalink
Optimize analysis calculation (#495)
Browse files Browse the repository at this point in the history
* Optimize analysis calculation

Aggregating analyzed results is slow for complex expressions
because we aggregate samples over and over again.

This commit speeds that up by memoizing aggregates.

Signed-off-by: Filip Petkovski <[email protected]>

* Remove stray change

Signed-off-by: Filip Petkovski <[email protected]>

* Fix lint

Signed-off-by: Filip Petkovski <[email protected]>

---------

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski authored Nov 6, 2024
1 parent 30e7a9f commit 097e6e9
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 44 deletions.
79 changes: 37 additions & 42 deletions engine/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package engine

import (
"sync"

"github.com/prometheus/prometheus/promql"

"github.com/thanos-io/promql-engine/execution/model"
Expand All @@ -18,7 +20,12 @@ type ExplainableQuery interface {

type AnalyzeOutputNode struct {
OperatorTelemetry model.OperatorTelemetry `json:"telemetry,omitempty"`
Children []AnalyzeOutputNode `json:"children,omitempty"`
Children []*AnalyzeOutputNode `json:"children,omitempty"`

once sync.Once
totalSamples int64
peakSamples int64
totalSamplesPerStep []int64
}

type ExplainOutputNode struct {
Expand All @@ -29,60 +36,48 @@ type ExplainOutputNode struct {
var _ ExplainableQuery = &compatibilityQuery{}

func (a *AnalyzeOutputNode) TotalSamples() int64 {
var total int64
if a.OperatorTelemetry.Samples() != nil {
total += a.OperatorTelemetry.Samples().TotalSamples
}
if a.OperatorTelemetry.SubQuery() {
// Returning here to avoid double counting samples from children of subquery.
return total
}

for _, child := range a.Children {
c := child.TotalSamples()
if c > 0 {
total += child.TotalSamples()
}
}

return total
a.aggregateSamples()
return a.totalSamples
}

func (a *AnalyzeOutputNode) TotalSamplesPerStep() []int64 {
if a.OperatorTelemetry.Samples() == nil {
return []int64{}
}

total := a.OperatorTelemetry.Samples().TotalSamplesPerStep
for _, child := range a.Children {
for i, s := range child.TotalSamplesPerStep() {
total[i] += s
}
}

return total
a.aggregateSamples()
return a.totalSamplesPerStep
}

func (a *AnalyzeOutputNode) PeakSamples() int64 {
var peak int64
if a.OperatorTelemetry.Samples() != nil {
peak = int64(a.OperatorTelemetry.Samples().PeakSamples)
}
for _, child := range a.Children {
childPeak := child.PeakSamples()
if childPeak > peak {
peak = childPeak
a.aggregateSamples()
return a.peakSamples
}

func (a *AnalyzeOutputNode) aggregateSamples() {
a.once.Do(func() {
if nodeSamples := a.OperatorTelemetry.Samples(); nodeSamples != nil {
a.totalSamples += nodeSamples.TotalSamples
a.peakSamples += int64(nodeSamples.PeakSamples)
a.totalSamplesPerStep = nodeSamples.TotalSamplesPerStep
}
}
return peak

for _, child := range a.Children {
childPeak := child.PeakSamples()
a.peakSamples = max(a.peakSamples, childPeak)
for i, s := range child.TotalSamplesPerStep() {
a.totalSamplesPerStep[i] += s
}
// Aggregate only if the node is not a subquery to avoid double counting samples from children.
if !a.OperatorTelemetry.SubQuery() {
a.totalSamples += child.TotalSamples()
}
}
})
}

func analyzeQuery(obsv model.ObservableVectorOperator) *AnalyzeOutputNode {
children := obsv.Explain()
var childTelemetry []AnalyzeOutputNode
var childTelemetry []*AnalyzeOutputNode
for _, child := range children {
if obsChild, ok := child.(model.ObservableVectorOperator); ok {
childTelemetry = append(childTelemetry, *analyzeQuery(obsChild))
childTelemetry = append(childTelemetry, analyzeQuery(obsChild))
}
}

Expand Down
4 changes: 2 additions & 2 deletions engine/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func assertExecutionTimeNonZero(t *testing.T, got *engine.AnalyzeOutputNode) boo
}
for i := range got.Children {
child := got.Children[i]
return got.OperatorTelemetry.ExecutionTimeTaken() > 0 && assertExecutionTimeNonZero(t, &child)
return got.OperatorTelemetry.ExecutionTimeTaken() > 0 && assertExecutionTimeNonZero(t, child)
}
}
return true
Expand Down Expand Up @@ -256,7 +256,7 @@ func renderAnalysisTree(node *engine.AnalyzeOutputNode, level int) string {

result.WriteString(fmt.Sprintf("%s: %d peak: %d\n", node.OperatorTelemetry.String(), totalSamples, peakSamples))
for _, child := range node.Children {
result.WriteString(renderAnalysisTree(&child, level+1))
result.WriteString(renderAnalysisTree(child, level+1))
}

return result.String()
Expand Down

0 comments on commit 097e6e9

Please sign in to comment.