From 1c3604ea694002e3e307b1b8124811e09eb505e9 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 13 Jan 2025 16:57:39 +0900 Subject: [PATCH] Add histogram address logic and warnings to some functions Signed-off-by: SungJin1212 --- execution/function/functions.go | 51 ++++++++++++++++++ ringbuffer/functions.go | 92 +++++++++++++++++++++++++++++++-- 2 files changed, 138 insertions(+), 5 deletions(-) diff --git a/execution/function/functions.go b/execution/function/functions.go index a26546dd..c17e56ad 100644 --- a/execution/function/functions.go +++ b/execution/function/functions.go @@ -53,6 +53,10 @@ var instantVectorFuncs = map[string]functionCall{ return sign }), "round": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { + if h != nil { + return 0., false + } + if len(vargs) > 1 { return 0., false } @@ -71,6 +75,10 @@ var instantVectorFuncs = map[string]functionCall{ return f, true }, "clamp": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { + if h != nil { + return 0., false + } + if len(vargs) != 2 { return 0., false } @@ -86,6 +94,10 @@ var instantVectorFuncs = map[string]functionCall{ return math.Max(min, math.Min(max, v)), true }, "clamp_min": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { + if h != nil { + return 0., false + } + if len(vargs) != 1 { return 0., false } @@ -96,6 +108,10 @@ var instantVectorFuncs = map[string]functionCall{ return math.Max(min, v), true }, "clamp_max": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { + if h != nil { + return 0., false + } + if len(vargs) != 1 { return 0., false } @@ -131,27 +147,59 @@ var instantVectorFuncs = map[string]functionCall{ }, // variants of date time functions with an argument "days_in_month": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { + if h != nil { + return 0., false + } + return daysInMonth(dateFromSampleValue(f)), true }, "day_of_month": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { + if h != nil { + return 0., false + } + return dayOfMonth(dateFromSampleValue(f)), true }, "day_of_week": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { + if h != nil { + return 0., false + } + return dayOfWeek(dateFromSampleValue(f)), true }, "day_of_year": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { + if h != nil { + return 0., false + } + return dayOfYear(dateFromSampleValue(f)), true }, "hour": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { + if h != nil { + return 0., false + } + return hour(dateFromSampleValue(f)), true }, "minute": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { + if h != nil { + return 0., false + } + return minute(dateFromSampleValue(f)), true }, "month": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { + if h != nil { + return 0., false + } + return month(dateFromSampleValue(f)), true }, "year": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { + if h != nil { + return 0., false + } + return year(dateFromSampleValue(f)), true }, // hack we only have sort functions as argument for "timestamp" possibly so they dont actually @@ -209,6 +257,9 @@ var noArgFuncs = map[string]noArgFunctionCall{ func simpleFunc(f func(float64) float64) functionCall { return func(v float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) { + if h != nil { + return 0., false + } return f(v), true } } diff --git a/ringbuffer/functions.go b/ringbuffer/functions.go index 246d2d84..a62c4c8f 100644 --- a/ringbuffer/functions.go +++ b/ringbuffer/functions.go @@ -7,10 +7,13 @@ import ( "context" "math" + "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/util/annotations" "github.com/thanos-io/promql-engine/execution/aggregate" "github.com/thanos-io/promql-engine/execution/parse" + "github.com/thanos-io/promql-engine/execution/warnings" ) type SamplesBuffer GenericRingBuffer @@ -59,9 +62,25 @@ func instantValue(samples []Sample, isRate bool) (float64, bool) { var rangeVectorFuncs = map[string]FunctionCall{ "sum_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return 0., nil, false, nil + return nil, nil, false, nil } - return sumOverTime(f.Samples), nil, true, nil + + if f.Samples[0].V.H != nil { + // histogram + sum := f.Samples[0].V.H.Copy() + for _, sample := range f.Samples[1:] { + h := sample.V.H + _, err := sum.Add(h) + if err != nil { + return nil, sum, true, nil + } + } + + return nil, sum, true, nil + } + + v := sumOverTime(f.Samples) + return &v, nil, true, nil }, "max_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { @@ -79,9 +98,31 @@ var rangeVectorFuncs = map[string]FunctionCall{ }, "avg_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { - return 0., nil, false, nil + return nil, nil, false, nil + } + + if f.Samples[0].V.H != nil { + // histogram + count := 1 + mean := f.Samples[0].V.H.Copy() + for _, sample := range f.Samples[1:] { + count++ + left := sample.V.H.Copy().Div(float64(count)) + right := mean.Copy().Div(float64(count)) + toAdd, err := left.Sub(right) + if err != nil { + return nil, mean, true, nil + } + _, err = mean.Add(toAdd) + if err != nil { + return nil, mean, true, nil + } + } + return nil, mean, true, nil } - return avgOverTime(f.Samples), nil, true, nil + + v := avgOverTime(f.Samples) + return &v, nil, true, nil }, "stddev_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { if len(f.Samples) == 0 { @@ -150,7 +191,14 @@ var rangeVectorFuncs = map[string]FunctionCall{ if len(f.Samples) < 2 { return nil, nil, false, nil } - return deriv(f.Samples), nil, true, nil + + if f.Samples[0].V.H != nil { + // deriv should ignore histograms. + return nil, nil, false, nil + } + + v := deriv(f.Samples) + return &v, nil, true, nil }, "irate": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) { f.Samples = filterFloatOnlySamples(f.Samples) @@ -459,8 +507,10 @@ func histogramRate(ctx context.Context, points []Sample, isCounter bool) (*histo } prev := points[0].V.H // We already know that this is a histogram. + usingCustomBuckets := prev.UsesCustomBuckets() last := points[len(points)-1].V.H if last == nil { + warnings.AddToContext(annotations.MixedFloatsHistogramsWarning, ctx) return nil, nil // Range contains a mix of histograms and floats. } minSchema := prev.Schema @@ -468,6 +518,17 @@ func histogramRate(ctx context.Context, points []Sample, isCounter bool) (*histo minSchema = last.Schema } + if last.UsesCustomBuckets() != usingCustomBuckets { + warnings.AddToContext(annotations.MixedExponentialCustomHistogramsWarning, ctx) + return nil, nil + } + + // We check for gauge type histograms in the loop below, but the loop below does not run on the first and last point, + // so check the first and last point now. + if isCounter && (prev.CounterResetHint == histogram.GaugeType || last.CounterResetHint == histogram.GaugeType) { + warnings.AddToContext(annotations.NativeHistogramNotCounterWarning, ctx) + } + // https://github.com/prometheus/prometheus/blob/ccea61c7bf1e6bce2196ba8189a209945a204c5b/promql/functions.go#L183 // First iteration to find out two things: // - What's the smallest relevant schema? @@ -476,18 +537,31 @@ func histogramRate(ctx context.Context, points []Sample, isCounter bool) (*histo for _, currPoint := range points[1 : len(points)-1] { curr := currPoint.V.H if curr == nil { + warnings.AddToContext(annotations.MixedFloatsHistogramsWarning, ctx) return nil, nil // Range contains a mix of histograms and floats. } if !isCounter { continue } + if curr.CounterResetHint == histogram.GaugeType { + warnings.AddToContext(annotations.NativeHistogramNotCounterWarning, ctx) + } if curr.Schema < minSchema { minSchema = curr.Schema } + if curr.UsesCustomBuckets() != usingCustomBuckets { + warnings.AddToContext(annotations.MixedExponentialCustomHistogramsWarning, ctx) + return nil, nil + } } h := last.CopyToSchema(minSchema) if _, err := h.Sub(prev); err != nil { + if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { + warnings.AddToContext(annotations.MixedExponentialCustomHistogramsWarning, ctx) + } else if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { + warnings.AddToContext(annotations.IncompatibleCustomBucketsHistogramsWarning, ctx) + } return nil, err } @@ -497,12 +571,20 @@ func histogramRate(ctx context.Context, points []Sample, isCounter bool) (*histo curr := currPoint.V.H if curr.DetectReset(prev) { if _, err := h.Add(prev); err != nil { + if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) { + warnings.AddToContext(annotations.MixedExponentialCustomHistogramsWarning, ctx) + } else if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) { + warnings.AddToContext(annotations.IncompatibleCustomBucketsHistogramsWarning, ctx) + } return nil, err } } prev = curr } + } else if points[0].V.H.CounterResetHint != histogram.GaugeType || points[len(points)-1].V.H.CounterResetHint != histogram.GaugeType { + warnings.AddToContext(annotations.NativeHistogramNotGaugeWarning, ctx) } + h.CounterResetHint = histogram.GaugeType return h.Compact(0), nil }