Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
  • Loading branch information
harry671003 committed Jan 7, 2025
1 parent 0d3e89f commit d89d82a
Show file tree
Hide file tree
Showing 27 changed files with 229 additions and 176 deletions.
4 changes: 3 additions & 1 deletion engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sort"
"time"

"github.com/thanos-io/promql-engine/execution/telemetry"

"github.com/efficientgo/core/errors"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -522,7 +524,7 @@ func (q *Query) Explain() *ExplainOutputNode {
}

func (q *Query) Analyze() *AnalyzeOutputNode {
if observableRoot, ok := q.exec.(model.ObservableVectorOperator); ok {
if observableRoot, ok := q.exec.(telemetry.ObservableVectorOperator); ok {
return analyzeQuery(observableRoot)
}
return nil
Expand Down
31 changes: 15 additions & 16 deletions engine/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/prometheus/prometheus/promql"

"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/execution/telemetry"
"github.com/thanos-io/promql-engine/logicalplan"
)

type ExplainableQuery interface {
Expand All @@ -19,8 +21,8 @@ type ExplainableQuery interface {
}

type AnalyzeOutputNode struct {
OperatorTelemetry model.OperatorTelemetry `json:"telemetry,omitempty"`
Children []*AnalyzeOutputNode `json:"children,omitempty"`
OperatorTelemetry telemetry.OperatorTelemetry `json:"telemetry,omitempty"`
Children []*AnalyzeOutputNode `json:"children,omitempty"`

once sync.Once
totalSamples int64
Expand Down Expand Up @@ -62,33 +64,30 @@ func (a *AnalyzeOutputNode) aggregateSamples() {
childPeak := child.PeakSamples()
a.peakSamples = max(a.peakSamples, childPeak)

if a.OperatorTelemetry.SubQuery() {
// Skip aggregating from subquery to avoid double counting samples from children.
continue
}
if a.OperatorTelemetry.StepInvariant() {
// Children of step invariant operator outputs one step, but they should be counted towards all the steps.
switch a.OperatorTelemetry.LogicalNode().(type) {
case *logicalplan.Subquery:
// Skip aggregating samples for subquery
case *logicalplan.StepInvariantExpr:
childSamples := child.TotalSamples()
for i := 0; i < len(a.totalSamplesPerStep); i++ {
a.totalSamples += childSamples
a.totalSamplesPerStep[i] += childSamples
}
continue
}

a.totalSamples += child.TotalSamples()
for i, s := range child.TotalSamplesPerStep() {
a.totalSamplesPerStep[i] += s
default:
a.totalSamples += child.TotalSamples()
for i, s := range child.TotalSamplesPerStep() {
a.totalSamplesPerStep[i] += s
}
}
}
})
}

func analyzeQuery(obsv model.ObservableVectorOperator) *AnalyzeOutputNode {
func analyzeQuery(obsv telemetry.ObservableVectorOperator) *AnalyzeOutputNode {
children := obsv.Explain()
var childTelemetry []*AnalyzeOutputNode
for _, child := range children {
if obsChild, ok := child.(model.ObservableVectorOperator); ok {
if obsChild, ok := child.(telemetry.ObservableVectorOperator); ok {
childTelemetry = append(childTelemetry, analyzeQuery(obsChild))
}
}
Expand Down
6 changes: 4 additions & 2 deletions execution/aggregate/count_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ import (
"sync"
"time"

"github.com/thanos-io/promql-engine/execution/telemetry"

"github.com/prometheus/prometheus/model/labels"

"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/query"
)

type countValuesOperator struct {
model.OperatorTelemetry
telemetry.OperatorTelemetry

pool *model.VectorPool
next model.VectorOperator
Expand Down Expand Up @@ -50,7 +52,7 @@ func NewCountValues(pool *model.VectorPool, next model.VectorOperator, param str
by: by,
grouping: grouping,
}
op.OperatorTelemetry = model.NewTelemetry(op, opts)
op.OperatorTelemetry = telemetry.NewTelemetry(op, opts)

return op
}
Expand Down
6 changes: 4 additions & 2 deletions execution/aggregate/hashaggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"time"

"github.com/thanos-io/promql-engine/execution/telemetry"

"github.com/efficientgo/core/errors"
"github.com/prometheus/prometheus/promql/parser/posrange"
"github.com/prometheus/prometheus/util/annotations"
Expand All @@ -25,7 +27,7 @@ import (
)

type aggregate struct {
model.OperatorTelemetry
telemetry.OperatorTelemetry

next model.VectorOperator
paramOp model.VectorOperator
Expand Down Expand Up @@ -74,7 +76,7 @@ func NewHashAggregate(
stepsBatch: opts.StepsBatch,
}

a.OperatorTelemetry = model.NewTelemetry(a, opts)
a.OperatorTelemetry = telemetry.NewTelemetry(a, opts)

return a, nil
}
Expand Down
6 changes: 4 additions & 2 deletions execution/aggregate/khashaggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"sync"
"time"

"github.com/thanos-io/promql-engine/execution/telemetry"

"github.com/efficientgo/core/errors"
"golang.org/x/exp/slices"

Expand All @@ -23,7 +25,7 @@ import (
)

type kAggregate struct {
model.OperatorTelemetry
telemetry.OperatorTelemetry

next model.VectorOperator
paramOp model.VectorOperator
Expand Down Expand Up @@ -78,7 +80,7 @@ func NewKHashAggregate(
params: make([]float64, opts.StepsBatch),
}

op.OperatorTelemetry = model.NewTelemetry(op, opts)
op.OperatorTelemetry = telemetry.NewTelemetry(op, opts)

return op, nil
}
Expand Down
6 changes: 4 additions & 2 deletions execution/binary/scalar.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"time"

"github.com/thanos-io/promql-engine/execution/telemetry"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"

Expand All @@ -28,7 +30,7 @@ const (

// scalarOperator evaluates expressions where one operand is a scalarOperator.
type scalarOperator struct {
model.OperatorTelemetry
telemetry.OperatorTelemetry

seriesOnce sync.Once
series []labels.Labels
Expand Down Expand Up @@ -84,7 +86,7 @@ func NewScalar(
bothScalars: scalarSide == ScalarSideBoth,
}

oper.OperatorTelemetry = model.NewTelemetry(op, opts)
oper.OperatorTelemetry = telemetry.NewTelemetry(op, opts)

return oper, nil

Expand Down
6 changes: 4 additions & 2 deletions execution/binary/vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync"
"time"

"github.com/thanos-io/promql-engine/execution/telemetry"

"github.com/cespare/xxhash/v2"
"github.com/efficientgo/core/errors"
"github.com/zhangyunhao116/umap"
Expand Down Expand Up @@ -55,7 +57,7 @@ type vectorOperator struct {
// If true then 1/0 needs to be returned instead of the value.
returnBool bool

model.OperatorTelemetry
telemetry.OperatorTelemetry
}

func NewVectorOperator(
Expand All @@ -77,7 +79,7 @@ func NewVectorOperator(
sigFunc: signatureFunc(matching.On, matching.MatchingLabels...),
}

oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts)

return oper, nil
}
Expand Down
6 changes: 4 additions & 2 deletions execution/exchange/coalesce.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"sync/atomic"
"time"

"github.com/thanos-io/promql-engine/execution/telemetry"

"github.com/efficientgo/core/errors"
"github.com/prometheus/prometheus/model/labels"

Expand All @@ -34,7 +36,7 @@ func (c errorChan) getError() error {
// coalesce guarantees that samples from different input vectors will be added to the output in the same order
// as the input vectors themselves are provided in NewCoalesce.
type coalesce struct {
model.OperatorTelemetry
telemetry.OperatorTelemetry

once sync.Once
series []labels.Labels
Expand All @@ -59,7 +61,7 @@ func NewCoalesce(pool *model.VectorPool, opts *query.Options, batchSize int64, o
batchSize: batchSize,
}

oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts)

return oper
}
Expand Down
6 changes: 4 additions & 2 deletions execution/exchange/concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync"
"time"

"github.com/thanos-io/promql-engine/execution/telemetry"

"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/query"

Expand All @@ -25,7 +27,7 @@ type concurrencyOperator struct {
next model.VectorOperator
buffer chan maybeStepVector
bufferSize int
model.OperatorTelemetry
telemetry.OperatorTelemetry
}

func NewConcurrent(next model.VectorOperator, bufferSize int, opts *query.Options) model.VectorOperator {
Expand All @@ -35,7 +37,7 @@ func NewConcurrent(next model.VectorOperator, bufferSize int, opts *query.Option
bufferSize: bufferSize,
}

oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts)
return oper
}

Expand Down
6 changes: 4 additions & 2 deletions execution/exchange/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"github.com/thanos-io/promql-engine/execution/telemetry"

"github.com/cespare/xxhash/v2"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -31,7 +33,7 @@ type dedupCache []dedupSample
// if multiple samples with the same ID are present in a StepVector, dedupOperator
// will keep the last sample in that vector.
type dedupOperator struct {
model.OperatorTelemetry
telemetry.OperatorTelemetry

once sync.Once
series []labels.Labels
Expand All @@ -48,7 +50,7 @@ func NewDedupOperator(pool *model.VectorPool, next model.VectorOperator, opts *q
next: next,
pool: pool,
}
oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts)

return oper
}
Expand Down
6 changes: 4 additions & 2 deletions execution/exchange/duplicate_label.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"github.com/thanos-io/promql-engine/execution/telemetry"

"github.com/prometheus/prometheus/model/labels"

"github.com/thanos-io/promql-engine/execution/model"
Expand All @@ -18,7 +20,7 @@ import (
type pair struct{ a, b int }

type duplicateLabelCheckOperator struct {
model.OperatorTelemetry
telemetry.OperatorTelemetry

once sync.Once
next model.VectorOperator
Expand All @@ -31,7 +33,7 @@ func NewDuplicateLabelCheck(next model.VectorOperator, opts *query.Options) mode
oper := &duplicateLabelCheckOperator{
next: next,
}
oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts)

return oper
}
Expand Down
6 changes: 4 additions & 2 deletions execution/function/absent.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync"
"time"

"github.com/thanos-io/promql-engine/execution/telemetry"

"github.com/prometheus/prometheus/model/labels"

"github.com/thanos-io/promql-engine/execution/model"
Expand All @@ -16,7 +18,7 @@ import (
)

type absentOperator struct {
model.OperatorTelemetry
telemetry.OperatorTelemetry

once sync.Once
funcExpr *logicalplan.FunctionCall
Expand All @@ -36,7 +38,7 @@ func newAbsentOperator(
pool: pool,
next: next,
}
oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts)

return oper
}
Expand Down
6 changes: 4 additions & 2 deletions execution/function/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync"
"time"

"github.com/thanos-io/promql-engine/execution/telemetry"

"github.com/cespare/xxhash/v2"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser/posrange"
Expand All @@ -31,7 +33,7 @@ type histogramSeries struct {

// histogramOperator is a function operator that calculates percentiles.
type histogramOperator struct {
model.OperatorTelemetry
telemetry.OperatorTelemetry
once sync.Once
series []labels.Labels

Expand Down Expand Up @@ -66,7 +68,7 @@ func newHistogramOperator(
vectorOp: vectorOp,
scalarPoints: make([]float64, opts.StepsBatch),
}
oper.OperatorTelemetry = model.NewTelemetry(oper, opts)
oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts)

return oper
}
Expand Down
4 changes: 3 additions & 1 deletion execution/function/noarg.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,16 @@ import (
"context"
"time"

"github.com/thanos-io/promql-engine/execution/telemetry"

"github.com/prometheus/prometheus/model/labels"

"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/logicalplan"
)

type noArgFunctionOperator struct {
model.OperatorTelemetry
telemetry.OperatorTelemetry

mint int64
maxt int64
Expand Down
Loading

0 comments on commit d89d82a

Please sign in to comment.