From d89d82aaa77deea52efbc7ccfe037ee3eca44988 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=B2=20Harry=20=F0=9F=8C=8A=20John=20=F0=9F=8F=94?= Date: Tue, 7 Jan 2025 08:39:00 -0800 Subject: [PATCH] Refactoring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 🌲 Harry 🌊 John 🏔 --- engine/engine.go | 4 +- engine/explain.go | 31 +++--- execution/aggregate/count_values.go | 6 +- execution/aggregate/hashaggregate.go | 6 +- execution/aggregate/khashaggregate.go | 6 +- execution/binary/scalar.go | 6 +- execution/binary/vector.go | 6 +- execution/exchange/coalesce.go | 6 +- execution/exchange/concurrent.go | 6 +- execution/exchange/dedup.go | 6 +- execution/exchange/duplicate_label.go | 6 +- execution/function/absent.go | 6 +- execution/function/histogram.go | 6 +- execution/function/noarg.go | 4 +- execution/function/operator.go | 8 +- execution/function/relabel.go | 6 +- execution/function/scalar.go | 6 +- execution/function/timestamp.go | 6 +- execution/model/operator.go | 113 ------------------- execution/remote/operator.go | 6 +- execution/scan/literal_selector.go | 6 +- execution/scan/subquery.go | 6 +- execution/step_invariant/step_invariant.go | 6 +- execution/telemetry/telemetry.go | 119 +++++++++++++++++++++ execution/unary/unary.go | 6 +- storage/prometheus/matrix_selector.go | 6 +- storage/prometheus/vector_selector.go | 6 +- 27 files changed, 229 insertions(+), 176 deletions(-) create mode 100644 execution/telemetry/telemetry.go diff --git a/engine/engine.go b/engine/engine.go index 152947e0..c3cd7552 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -10,6 +10,8 @@ import ( "sort" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/efficientgo/core/errors" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -522,7 +524,7 @@ func (q *Query) Explain() *ExplainOutputNode { } func (q *Query) Analyze() *AnalyzeOutputNode { - if observableRoot, ok := q.exec.(model.ObservableVectorOperator); ok { + if observableRoot, ok := q.exec.(telemetry.ObservableVectorOperator); ok { return analyzeQuery(observableRoot) } return nil diff --git a/engine/explain.go b/engine/explain.go index 4529138a..f0bce2f5 100644 --- a/engine/explain.go +++ b/engine/explain.go @@ -9,6 +9,8 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/thanos-io/promql-engine/execution/model" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/thanos-io/promql-engine/logicalplan" ) type ExplainableQuery interface { @@ -19,8 +21,8 @@ type ExplainableQuery interface { } type AnalyzeOutputNode struct { - OperatorTelemetry model.OperatorTelemetry `json:"telemetry,omitempty"` - Children []*AnalyzeOutputNode `json:"children,omitempty"` + OperatorTelemetry telemetry.OperatorTelemetry `json:"telemetry,omitempty"` + Children []*AnalyzeOutputNode `json:"children,omitempty"` once sync.Once totalSamples int64 @@ -62,33 +64,30 @@ func (a *AnalyzeOutputNode) aggregateSamples() { childPeak := child.PeakSamples() a.peakSamples = max(a.peakSamples, childPeak) - if a.OperatorTelemetry.SubQuery() { - // Skip aggregating from subquery to avoid double counting samples from children. - continue - } - if a.OperatorTelemetry.StepInvariant() { - // Children of step invariant operator outputs one step, but they should be counted towards all the steps. + switch a.OperatorTelemetry.LogicalNode().(type) { + case *logicalplan.Subquery: + // Skip aggregating samples for subquery + case *logicalplan.StepInvariantExpr: childSamples := child.TotalSamples() for i := 0; i < len(a.totalSamplesPerStep); i++ { a.totalSamples += childSamples a.totalSamplesPerStep[i] += childSamples } - continue - } - - a.totalSamples += child.TotalSamples() - for i, s := range child.TotalSamplesPerStep() { - a.totalSamplesPerStep[i] += s + default: + a.totalSamples += child.TotalSamples() + for i, s := range child.TotalSamplesPerStep() { + a.totalSamplesPerStep[i] += s + } } } }) } -func analyzeQuery(obsv model.ObservableVectorOperator) *AnalyzeOutputNode { +func analyzeQuery(obsv telemetry.ObservableVectorOperator) *AnalyzeOutputNode { children := obsv.Explain() var childTelemetry []*AnalyzeOutputNode for _, child := range children { - if obsChild, ok := child.(model.ObservableVectorOperator); ok { + if obsChild, ok := child.(telemetry.ObservableVectorOperator); ok { childTelemetry = append(childTelemetry, analyzeQuery(obsChild)) } } diff --git a/execution/aggregate/count_values.go b/execution/aggregate/count_values.go index 754c9dd9..d1b517cd 100644 --- a/execution/aggregate/count_values.go +++ b/execution/aggregate/count_values.go @@ -11,6 +11,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/promql-engine/execution/model" @@ -18,7 +20,7 @@ import ( ) type countValuesOperator struct { - model.OperatorTelemetry + telemetry.OperatorTelemetry pool *model.VectorPool next model.VectorOperator @@ -50,7 +52,7 @@ func NewCountValues(pool *model.VectorPool, next model.VectorOperator, param str by: by, grouping: grouping, } - op.OperatorTelemetry = model.NewTelemetry(op, opts) + op.OperatorTelemetry = telemetry.NewTelemetry(op, opts) return op } diff --git a/execution/aggregate/hashaggregate.go b/execution/aggregate/hashaggregate.go index 9d4fc469..e8d9e8e2 100644 --- a/execution/aggregate/hashaggregate.go +++ b/execution/aggregate/hashaggregate.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/util/annotations" @@ -25,7 +27,7 @@ import ( ) type aggregate struct { - model.OperatorTelemetry + telemetry.OperatorTelemetry next model.VectorOperator paramOp model.VectorOperator @@ -74,7 +76,7 @@ func NewHashAggregate( stepsBatch: opts.StepsBatch, } - a.OperatorTelemetry = model.NewTelemetry(a, opts) + a.OperatorTelemetry = telemetry.NewTelemetry(a, opts) return a, nil } diff --git a/execution/aggregate/khashaggregate.go b/execution/aggregate/khashaggregate.go index 524e7f06..ba775550 100644 --- a/execution/aggregate/khashaggregate.go +++ b/execution/aggregate/khashaggregate.go @@ -12,6 +12,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/efficientgo/core/errors" "golang.org/x/exp/slices" @@ -23,7 +25,7 @@ import ( ) type kAggregate struct { - model.OperatorTelemetry + telemetry.OperatorTelemetry next model.VectorOperator paramOp model.VectorOperator @@ -78,7 +80,7 @@ func NewKHashAggregate( params: make([]float64, opts.StepsBatch), } - op.OperatorTelemetry = model.NewTelemetry(op, opts) + op.OperatorTelemetry = telemetry.NewTelemetry(op, opts) return op, nil } diff --git a/execution/binary/scalar.go b/execution/binary/scalar.go index 1a411175..b8430252 100644 --- a/execution/binary/scalar.go +++ b/execution/binary/scalar.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" @@ -28,7 +30,7 @@ const ( // scalarOperator evaluates expressions where one operand is a scalarOperator. type scalarOperator struct { - model.OperatorTelemetry + telemetry.OperatorTelemetry seriesOnce sync.Once series []labels.Labels @@ -84,7 +86,7 @@ func NewScalar( bothScalars: scalarSide == ScalarSideBoth, } - oper.OperatorTelemetry = model.NewTelemetry(op, opts) + oper.OperatorTelemetry = telemetry.NewTelemetry(op, opts) return oper, nil diff --git a/execution/binary/vector.go b/execution/binary/vector.go index e2617815..585b39a4 100644 --- a/execution/binary/vector.go +++ b/execution/binary/vector.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/cespare/xxhash/v2" "github.com/efficientgo/core/errors" "github.com/zhangyunhao116/umap" @@ -55,7 +57,7 @@ type vectorOperator struct { // If true then 1/0 needs to be returned instead of the value. returnBool bool - model.OperatorTelemetry + telemetry.OperatorTelemetry } func NewVectorOperator( @@ -77,7 +79,7 @@ func NewVectorOperator( sigFunc: signatureFunc(matching.On, matching.MatchingLabels...), } - oper.OperatorTelemetry = model.NewTelemetry(oper, opts) + oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts) return oper, nil } diff --git a/execution/exchange/coalesce.go b/execution/exchange/coalesce.go index 3145f958..93a285f0 100644 --- a/execution/exchange/coalesce.go +++ b/execution/exchange/coalesce.go @@ -10,6 +10,8 @@ import ( "sync/atomic" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/labels" @@ -34,7 +36,7 @@ func (c errorChan) getError() error { // coalesce guarantees that samples from different input vectors will be added to the output in the same order // as the input vectors themselves are provided in NewCoalesce. type coalesce struct { - model.OperatorTelemetry + telemetry.OperatorTelemetry once sync.Once series []labels.Labels @@ -59,7 +61,7 @@ func NewCoalesce(pool *model.VectorPool, opts *query.Options, batchSize int64, o batchSize: batchSize, } - oper.OperatorTelemetry = model.NewTelemetry(oper, opts) + oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts) return oper } diff --git a/execution/exchange/concurrent.go b/execution/exchange/concurrent.go index b55058fa..62baeaac 100644 --- a/execution/exchange/concurrent.go +++ b/execution/exchange/concurrent.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/query" @@ -25,7 +27,7 @@ type concurrencyOperator struct { next model.VectorOperator buffer chan maybeStepVector bufferSize int - model.OperatorTelemetry + telemetry.OperatorTelemetry } func NewConcurrent(next model.VectorOperator, bufferSize int, opts *query.Options) model.VectorOperator { @@ -35,7 +37,7 @@ func NewConcurrent(next model.VectorOperator, bufferSize int, opts *query.Option bufferSize: bufferSize, } - oper.OperatorTelemetry = model.NewTelemetry(oper, opts) + oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts) return oper } diff --git a/execution/exchange/dedup.go b/execution/exchange/dedup.go index 1ae20d83..fceadca0 100644 --- a/execution/exchange/dedup.go +++ b/execution/exchange/dedup.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/cespare/xxhash/v2" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -31,7 +33,7 @@ type dedupCache []dedupSample // if multiple samples with the same ID are present in a StepVector, dedupOperator // will keep the last sample in that vector. type dedupOperator struct { - model.OperatorTelemetry + telemetry.OperatorTelemetry once sync.Once series []labels.Labels @@ -48,7 +50,7 @@ func NewDedupOperator(pool *model.VectorPool, next model.VectorOperator, opts *q next: next, pool: pool, } - oper.OperatorTelemetry = model.NewTelemetry(oper, opts) + oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts) return oper } diff --git a/execution/exchange/duplicate_label.go b/execution/exchange/duplicate_label.go index 54f47668..bb583493 100644 --- a/execution/exchange/duplicate_label.go +++ b/execution/exchange/duplicate_label.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/promql-engine/execution/model" @@ -18,7 +20,7 @@ import ( type pair struct{ a, b int } type duplicateLabelCheckOperator struct { - model.OperatorTelemetry + telemetry.OperatorTelemetry once sync.Once next model.VectorOperator @@ -31,7 +33,7 @@ func NewDuplicateLabelCheck(next model.VectorOperator, opts *query.Options) mode oper := &duplicateLabelCheckOperator{ next: next, } - oper.OperatorTelemetry = model.NewTelemetry(oper, opts) + oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts) return oper } diff --git a/execution/function/absent.go b/execution/function/absent.go index 0d0c152b..de49b80d 100644 --- a/execution/function/absent.go +++ b/execution/function/absent.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/promql-engine/execution/model" @@ -16,7 +18,7 @@ import ( ) type absentOperator struct { - model.OperatorTelemetry + telemetry.OperatorTelemetry once sync.Once funcExpr *logicalplan.FunctionCall @@ -36,7 +38,7 @@ func newAbsentOperator( pool: pool, next: next, } - oper.OperatorTelemetry = model.NewTelemetry(oper, opts) + oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts) return oper } diff --git a/execution/function/histogram.go b/execution/function/histogram.go index 2dbd775f..1a7485e4 100644 --- a/execution/function/histogram.go +++ b/execution/function/histogram.go @@ -11,6 +11,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/cespare/xxhash/v2" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser/posrange" @@ -31,7 +33,7 @@ type histogramSeries struct { // histogramOperator is a function operator that calculates percentiles. type histogramOperator struct { - model.OperatorTelemetry + telemetry.OperatorTelemetry once sync.Once series []labels.Labels @@ -66,7 +68,7 @@ func newHistogramOperator( vectorOp: vectorOp, scalarPoints: make([]float64, opts.StepsBatch), } - oper.OperatorTelemetry = model.NewTelemetry(oper, opts) + oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts) return oper } diff --git a/execution/function/noarg.go b/execution/function/noarg.go index 79fcc863..5fd37b64 100644 --- a/execution/function/noarg.go +++ b/execution/function/noarg.go @@ -7,6 +7,8 @@ import ( "context" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/promql-engine/execution/model" @@ -14,7 +16,7 @@ import ( ) type noArgFunctionOperator struct { - model.OperatorTelemetry + telemetry.OperatorTelemetry mint int64 maxt int64 diff --git a/execution/function/operator.go b/execution/function/operator.go index 1741d839..99ac585f 100644 --- a/execution/function/operator.go +++ b/execution/function/operator.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" @@ -66,7 +68,7 @@ func newNoArgsFunctionOperator(funcExpr *logicalplan.FunctionCall, stepsBatch in call: call, vectorPool: model.NewVectorPool(stepsBatch), } - op.OperatorTelemetry = model.NewTelemetry(op, opts) + op.OperatorTelemetry = telemetry.NewTelemetry(op, opts) switch funcExpr.Func.Name { case "pi", "time": @@ -82,7 +84,7 @@ func newNoArgsFunctionOperator(funcExpr *logicalplan.FunctionCall, stepsBatch in // functionOperator returns []model.StepVector after processing input with desired function. type functionOperator struct { - model.OperatorTelemetry + telemetry.OperatorTelemetry funcExpr *logicalplan.FunctionCall series []labels.Labels @@ -112,7 +114,7 @@ func newInstantVectorFunctionOperator(funcExpr *logicalplan.FunctionCall, nextOp vectorIndex: 0, scalarPoints: scalarPoints, } - f.OperatorTelemetry = model.NewTelemetry(f, opts) + f.OperatorTelemetry = telemetry.NewTelemetry(f, opts) for i := range funcExpr.Args { if funcExpr.Args[i].ReturnType() == parser.ValueTypeVector { diff --git a/execution/function/relabel.go b/execution/function/relabel.go index 2e2d8500..9e747a2e 100644 --- a/execution/function/relabel.go +++ b/execution/function/relabel.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/efficientgo/core/errors" prommodel "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -20,7 +22,7 @@ import ( ) type relabelOperator struct { - model.OperatorTelemetry + telemetry.OperatorTelemetry next model.VectorOperator funcExpr *logicalplan.FunctionCall @@ -37,7 +39,7 @@ func newRelabelOperator( next: next, funcExpr: funcExpr, } - oper.OperatorTelemetry = model.NewTelemetry(oper, opts) + oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts) return oper } diff --git a/execution/function/scalar.go b/execution/function/scalar.go index 8cd1c84a..c67b77d4 100644 --- a/execution/function/scalar.go +++ b/execution/function/scalar.go @@ -8,6 +8,8 @@ import ( "math" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/promql-engine/execution/model" @@ -17,7 +19,7 @@ import ( type scalarOperator struct { pool *model.VectorPool next model.VectorOperator - model.OperatorTelemetry + telemetry.OperatorTelemetry } func newScalarOperator(pool *model.VectorPool, next model.VectorOperator, opts *query.Options) *scalarOperator { @@ -26,7 +28,7 @@ func newScalarOperator(pool *model.VectorPool, next model.VectorOperator, opts * next: next, } - oper.OperatorTelemetry = model.NewTelemetry(oper, opts) + oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts) return oper } diff --git a/execution/function/timestamp.go b/execution/function/timestamp.go index 41182b89..443f9c76 100644 --- a/execution/function/timestamp.go +++ b/execution/function/timestamp.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/promql-engine/execution/model" @@ -17,7 +19,7 @@ import ( type timestampOperator struct { next model.VectorOperator - model.OperatorTelemetry + telemetry.OperatorTelemetry series []labels.Labels once sync.Once @@ -27,7 +29,7 @@ func newTimestampOperator(next model.VectorOperator, opts *query.Options) *times oper := ×tampOperator{ next: next, } - oper.OperatorTelemetry = model.NewTelemetry(oper, opts) + oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts) return oper } diff --git a/execution/model/operator.go b/execution/model/operator.go index 2bdf44e4..12f82d66 100644 --- a/execution/model/operator.go +++ b/execution/model/operator.go @@ -6,123 +6,10 @@ package model import ( "context" "fmt" - "time" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/util/stats" - - "github.com/thanos-io/promql-engine/query" ) -type OperatorTelemetry interface { - fmt.Stringer - - AddExecutionTimeTaken(time.Duration) - ExecutionTimeTaken() time.Duration - IncrementSamplesAtTimestamp(samples int, t int64) - Samples() *stats.QuerySamples - SubQuery() bool - StepInvariant() bool -} - -func NewTelemetry(operator fmt.Stringer, opts *query.Options) OperatorTelemetry { - if opts.EnableAnalysis { - return NewTrackedTelemetry(operator, opts, false, false) - } - return NewNoopTelemetry(operator) -} - -func NewSubqueryTelemetry(operator fmt.Stringer, opts *query.Options) OperatorTelemetry { - if opts.EnableAnalysis { - return NewTrackedTelemetry(operator, opts, true, false) - } - return NewNoopTelemetry(operator) -} - -func NewInvariantTelemetry(operator fmt.Stringer, opts *query.Options) OperatorTelemetry { - if opts.EnableAnalysis { - return NewTrackedTelemetry(operator, opts, false, true) - } - return NewNoopTelemetry(operator) -} - -type NoopTelemetry struct { - fmt.Stringer -} - -func NewNoopTelemetry(operator fmt.Stringer) *NoopTelemetry { - return &NoopTelemetry{Stringer: operator} -} - -func (tm *NoopTelemetry) AddExecutionTimeTaken(t time.Duration) {} - -func (tm *NoopTelemetry) ExecutionTimeTaken() time.Duration { - return time.Duration(0) -} - -func (tm *NoopTelemetry) IncrementSamplesAtTimestamp(_ int, _ int64) {} - -func (tm *NoopTelemetry) Samples() *stats.QuerySamples { return nil } -func (tm *NoopTelemetry) SubQuery() bool { return false } -func (tm *NoopTelemetry) StepInvariant() bool { return false } - -type TrackedTelemetry struct { - fmt.Stringer - - ExecutionTime time.Duration - LoadedSamples *stats.QuerySamples - subquery bool - invariant bool -} - -func NewTrackedTelemetry(operator fmt.Stringer, opts *query.Options, subquery bool, invariant bool) *TrackedTelemetry { - ss := stats.NewQuerySamples(opts.EnablePerStepStats) - ss.InitStepTracking(opts.Start.UnixMilli(), opts.End.UnixMilli(), stepTrackingInterval(opts.Step)) - return &TrackedTelemetry{ - Stringer: operator, - LoadedSamples: ss, - subquery: subquery, - invariant: invariant, - } -} - -func stepTrackingInterval(step time.Duration) int64 { - if step == 0 { - return 1 - } - return int64(step / (time.Millisecond / time.Nanosecond)) -} - -func (ti *TrackedTelemetry) AddExecutionTimeTaken(t time.Duration) { ti.ExecutionTime += t } - -func (ti *TrackedTelemetry) ExecutionTimeTaken() time.Duration { - return ti.ExecutionTime -} - -func (ti *TrackedTelemetry) IncrementSamplesAtTimestamp(samples int, t int64) { - ti.updatePeak(samples) - ti.LoadedSamples.IncrementSamplesAtTimestamp(t, int64(samples)) -} - -func (ti *TrackedTelemetry) SubQuery() bool { - return ti.subquery -} - -func (ti *TrackedTelemetry) StepInvariant() bool { - return ti.invariant -} - -func (ti *TrackedTelemetry) updatePeak(samples int) { - ti.LoadedSamples.UpdatePeak(samples) -} - -func (ti *TrackedTelemetry) Samples() *stats.QuerySamples { return ti.LoadedSamples } - -type ObservableVectorOperator interface { - VectorOperator - OperatorTelemetry -} - // VectorOperator performs operations on series in step by step fashion. type VectorOperator interface { // Next yields vectors of samples from all series for one or more execution steps. diff --git a/execution/remote/operator.go b/execution/remote/operator.go index 35f290dd..761e6981 100644 --- a/execution/remote/operator.go +++ b/execution/remote/operator.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" @@ -27,7 +29,7 @@ type Execution struct { opts *query.Options queryRangeStart time.Time vectorSelector model.VectorOperator - model.OperatorTelemetry + telemetry.OperatorTelemetry } func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart time.Time, engineLabels []labels.Labels, opts *query.Options, _ storage.SelectHints) *Execution { @@ -40,7 +42,7 @@ func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart ti vectorSelector: promstorage.NewVectorSelector(pool, storage, opts, 0, 0, false, 0, 1), } - oper.OperatorTelemetry = model.NewTelemetry(oper, opts) + oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts) return oper } diff --git a/execution/scan/literal_selector.go b/execution/scan/literal_selector.go index 7a03706e..d6123d0b 100644 --- a/execution/scan/literal_selector.go +++ b/execution/scan/literal_selector.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/promql-engine/execution/model" @@ -28,7 +30,7 @@ type numberLiteralSelector struct { once sync.Once val float64 - model.OperatorTelemetry + telemetry.OperatorTelemetry } func NewNumberLiteralSelector(pool *model.VectorPool, opts *query.Options, val float64) *numberLiteralSelector { @@ -42,7 +44,7 @@ func NewNumberLiteralSelector(pool *model.VectorPool, opts *query.Options, val f val: val, } - oper.OperatorTelemetry = model.NewTelemetry(oper, opts) + oper.OperatorTelemetry = telemetry.NewTelemetry(oper, opts) return oper } diff --git a/execution/scan/subquery.go b/execution/scan/subquery.go index a5a1f359..50bc1fc7 100644 --- a/execution/scan/subquery.go +++ b/execution/scan/subquery.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/prometheus/prometheus/model/labels" "github.com/thanos-io/promql-engine/execution/model" @@ -20,7 +22,7 @@ import ( ) type subqueryOperator struct { - model.OperatorTelemetry + telemetry.OperatorTelemetry next model.VectorOperator paramOp model.VectorOperator @@ -74,7 +76,7 @@ func NewSubqueryOperator(pool *model.VectorPool, next, paramOp model.VectorOpera lastCollected: -1, params: make([]float64, opts.StepsBatch), } - o.OperatorTelemetry = model.NewSubqueryTelemetry(o, opts) + o.OperatorTelemetry = telemetry.NewSubqueryTelemetry(o, opts) return o, nil } diff --git a/execution/step_invariant/step_invariant.go b/execution/step_invariant/step_invariant.go index 06aebbe4..65809f4d 100644 --- a/execution/step_invariant/step_invariant.go +++ b/execution/step_invariant/step_invariant.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/labels" @@ -31,7 +33,7 @@ type stepInvariantOperator struct { step int64 currentStep int64 stepsBatch int - model.OperatorTelemetry + telemetry.OperatorTelemetry } func (u *stepInvariantOperator) Explain() (next []model.VectorOperator) { @@ -59,7 +61,7 @@ func NewStepInvariantOperator( stepsBatch: opts.StepsBatch, cacheResult: true, } - u.OperatorTelemetry = model.NewInvariantTelemetry(u, opts) + u.OperatorTelemetry = telemetry.NewStepInvariantTelemetry(u, opts) if u.step == 0 { u.step = 1 } diff --git a/execution/telemetry/telemetry.go b/execution/telemetry/telemetry.go new file mode 100644 index 00000000..b335a573 --- /dev/null +++ b/execution/telemetry/telemetry.go @@ -0,0 +1,119 @@ +// Copyright (c) The Thanos Community Authors. +// Licensed under the Apache License 2.0. + +package telemetry + +import ( + "fmt" + "time" + + "github.com/prometheus/prometheus/util/stats" + + "github.com/thanos-io/promql-engine/execution/model" + "github.com/thanos-io/promql-engine/logicalplan" + "github.com/thanos-io/promql-engine/query" +) + +type OperatorTelemetry interface { + fmt.Stringer + + AddExecutionTimeTaken(time.Duration) + ExecutionTimeTaken() time.Duration + IncrementSamplesAtTimestamp(samples int, t int64) + Samples() *stats.QuerySamples + LogicalNode() logicalplan.Node +} + +func NewTelemetry(operator fmt.Stringer, opts *query.Options) OperatorTelemetry { + if opts.EnableAnalysis { + return NewTrackedTelemetry(operator, opts, nil) + } + return NewNoopTelemetry(operator) +} + +func NewSubqueryTelemetry(operator fmt.Stringer, opts *query.Options) OperatorTelemetry { + if opts.EnableAnalysis { + return NewTrackedTelemetry(operator, opts, &logicalplan.Subquery{}) + } + return NewNoopTelemetry(operator) +} + +func NewStepInvariantTelemetry(operator fmt.Stringer, opts *query.Options) OperatorTelemetry { + if opts.EnableAnalysis { + return NewTrackedTelemetry(operator, opts, &logicalplan.StepInvariantExpr{}) + } + return NewNoopTelemetry(operator) +} + +type NoopTelemetry struct { + fmt.Stringer +} + +func NewNoopTelemetry(operator fmt.Stringer) *NoopTelemetry { + return &NoopTelemetry{Stringer: operator} +} + +func (tm *NoopTelemetry) AddExecutionTimeTaken(t time.Duration) {} + +func (tm *NoopTelemetry) ExecutionTimeTaken() time.Duration { + return time.Duration(0) +} + +func (tm *NoopTelemetry) IncrementSamplesAtTimestamp(_ int, _ int64) {} + +func (tm *NoopTelemetry) Samples() *stats.QuerySamples { return nil } + +func (tm *NoopTelemetry) LogicalNode() logicalplan.Node { + return nil +} + +type TrackedTelemetry struct { + fmt.Stringer + + ExecutionTime time.Duration + LoadedSamples *stats.QuerySamples + logicalNode logicalplan.Node +} + +func NewTrackedTelemetry(operator fmt.Stringer, opts *query.Options, logicalPlanNode logicalplan.Node) *TrackedTelemetry { + ss := stats.NewQuerySamples(opts.EnablePerStepStats) + ss.InitStepTracking(opts.Start.UnixMilli(), opts.End.UnixMilli(), stepTrackingInterval(opts.Step)) + return &TrackedTelemetry{ + Stringer: operator, + LoadedSamples: ss, + logicalNode: logicalPlanNode, + } +} + +func stepTrackingInterval(step time.Duration) int64 { + if step == 0 { + return 1 + } + return int64(step / (time.Millisecond / time.Nanosecond)) +} + +func (ti *TrackedTelemetry) AddExecutionTimeTaken(t time.Duration) { ti.ExecutionTime += t } + +func (ti *TrackedTelemetry) ExecutionTimeTaken() time.Duration { + return ti.ExecutionTime +} + +func (ti *TrackedTelemetry) IncrementSamplesAtTimestamp(samples int, t int64) { + ti.updatePeak(samples) + ti.LoadedSamples.IncrementSamplesAtTimestamp(t, int64(samples)) +} + +func (ti *TrackedTelemetry) LogicalNode() logicalplan.Node { + return ti.logicalNode +} + +func (ti *TrackedTelemetry) updatePeak(samples int) { + ti.LoadedSamples.UpdatePeak(samples) +} + +func (ti *TrackedTelemetry) Samples() *stats.QuerySamples { return ti.LoadedSamples } + +type ObservableVectorOperator interface { + model.VectorOperator + OperatorTelemetry +} diff --git a/execution/unary/unary.go b/execution/unary/unary.go index b56811a1..950972a4 100644 --- a/execution/unary/unary.go +++ b/execution/unary/unary.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/prometheus/prometheus/model/labels" "gonum.org/v1/gonum/floats" @@ -20,14 +22,14 @@ type unaryNegation struct { once sync.Once series []labels.Labels - model.OperatorTelemetry + telemetry.OperatorTelemetry } func NewUnaryNegation(next model.VectorOperator, opts *query.Options) (model.VectorOperator, error) { u := &unaryNegation{ next: next, } - u.OperatorTelemetry = model.NewTelemetry(u, opts) + u.OperatorTelemetry = telemetry.NewTelemetry(u, opts) return u, nil } diff --git a/storage/prometheus/matrix_selector.go b/storage/prometheus/matrix_selector.go index c86b73c7..1d306ad1 100644 --- a/storage/prometheus/matrix_selector.go +++ b/storage/prometheus/matrix_selector.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" @@ -34,7 +36,7 @@ type matrixScanner struct { } type matrixSelector struct { - model.OperatorTelemetry + telemetry.OperatorTelemetry vectorPool *model.VectorPool storage SeriesSelector @@ -109,7 +111,7 @@ func NewMatrixSelector( extLookbackDelta: opts.ExtLookbackDelta.Milliseconds(), } - m.OperatorTelemetry = model.NewTelemetry(m, opts) + m.OperatorTelemetry = telemetry.NewTelemetry(m, opts) // For instant queries, set the step to a positive value // so that the operator can terminate. diff --git a/storage/prometheus/vector_selector.go b/storage/prometheus/vector_selector.go index 2cc88fcd..62d619d2 100644 --- a/storage/prometheus/vector_selector.go +++ b/storage/prometheus/vector_selector.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/thanos-io/promql-engine/execution/telemetry" + "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/histogram" @@ -28,7 +30,7 @@ type vectorScanner struct { } type vectorSelector struct { - model.OperatorTelemetry + telemetry.OperatorTelemetry storage SeriesSelector scanners []vectorScanner @@ -82,7 +84,7 @@ func NewVectorSelector( selectTimestamp: selectTimestamp, } - o.OperatorTelemetry = model.NewTelemetry(o, queryOpts) + o.OperatorTelemetry = telemetry.NewTelemetry(o, queryOpts) // For instant queries, set the step to a positive value // so that the operator can terminate.