Skip to content

Commit

Permalink
Add histogram address logic and warnings to some functions
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Jan 13, 2025
1 parent 6d4d34c commit 1c3604e
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 5 deletions.
51 changes: 51 additions & 0 deletions execution/function/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ var instantVectorFuncs = map[string]functionCall{
return sign
}),
"round": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) {
if h != nil {
return 0., false
}

if len(vargs) > 1 {
return 0., false
}
Expand All @@ -71,6 +75,10 @@ var instantVectorFuncs = map[string]functionCall{
return f, true
},
"clamp": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) {
if h != nil {
return 0., false
}

if len(vargs) != 2 {
return 0., false
}
Expand All @@ -86,6 +94,10 @@ var instantVectorFuncs = map[string]functionCall{
return math.Max(min, math.Min(max, v)), true
},
"clamp_min": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) {
if h != nil {
return 0., false
}

if len(vargs) != 1 {
return 0., false
}
Expand All @@ -96,6 +108,10 @@ var instantVectorFuncs = map[string]functionCall{
return math.Max(min, v), true
},
"clamp_max": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) {
if h != nil {
return 0., false
}

if len(vargs) != 1 {
return 0., false
}
Expand Down Expand Up @@ -131,27 +147,59 @@ var instantVectorFuncs = map[string]functionCall{
},
// variants of date time functions with an argument
"days_in_month": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) {
if h != nil {
return 0., false
}

return daysInMonth(dateFromSampleValue(f)), true
},
"day_of_month": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) {
if h != nil {
return 0., false
}

return dayOfMonth(dateFromSampleValue(f)), true
},
"day_of_week": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) {
if h != nil {
return 0., false
}

return dayOfWeek(dateFromSampleValue(f)), true
},
"day_of_year": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) {
if h != nil {
return 0., false
}

return dayOfYear(dateFromSampleValue(f)), true
},
"hour": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) {
if h != nil {
return 0., false
}

return hour(dateFromSampleValue(f)), true
},
"minute": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) {
if h != nil {
return 0., false
}

return minute(dateFromSampleValue(f)), true
},
"month": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) {
if h != nil {
return 0., false
}

return month(dateFromSampleValue(f)), true
},
"year": func(f float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) {
if h != nil {
return 0., false
}

return year(dateFromSampleValue(f)), true
},
// hack we only have sort functions as argument for "timestamp" possibly so they dont actually
Expand Down Expand Up @@ -209,6 +257,9 @@ var noArgFuncs = map[string]noArgFunctionCall{

func simpleFunc(f func(float64) float64) functionCall {
return func(v float64, h *histogram.FloatHistogram, vargs ...float64) (float64, bool) {
if h != nil {
return 0., false
}
return f(v), true
}
}
Expand Down
92 changes: 87 additions & 5 deletions ringbuffer/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"context"
"math"

"github.com/efficientgo/core/errors"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/util/annotations"

"github.com/thanos-io/promql-engine/execution/aggregate"
"github.com/thanos-io/promql-engine/execution/parse"
"github.com/thanos-io/promql-engine/execution/warnings"
)

type SamplesBuffer GenericRingBuffer
Expand Down Expand Up @@ -59,9 +62,25 @@ func instantValue(samples []Sample, isRate bool) (float64, bool) {
var rangeVectorFuncs = map[string]FunctionCall{
"sum_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) {
if len(f.Samples) == 0 {
return 0., nil, false, nil
return nil, nil, false, nil
}
return sumOverTime(f.Samples), nil, true, nil

if f.Samples[0].V.H != nil {
// histogram
sum := f.Samples[0].V.H.Copy()
for _, sample := range f.Samples[1:] {
h := sample.V.H
_, err := sum.Add(h)
if err != nil {
return nil, sum, true, nil
}
}

return nil, sum, true, nil
}

v := sumOverTime(f.Samples)
return &v, nil, true, nil
},
"max_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) {
if len(f.Samples) == 0 {
Expand All @@ -79,9 +98,31 @@ var rangeVectorFuncs = map[string]FunctionCall{
},
"avg_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) {
if len(f.Samples) == 0 {
return 0., nil, false, nil
return nil, nil, false, nil
}

if f.Samples[0].V.H != nil {
// histogram
count := 1
mean := f.Samples[0].V.H.Copy()
for _, sample := range f.Samples[1:] {
count++
left := sample.V.H.Copy().Div(float64(count))
right := mean.Copy().Div(float64(count))
toAdd, err := left.Sub(right)
if err != nil {
return nil, mean, true, nil
}
_, err = mean.Add(toAdd)
if err != nil {
return nil, mean, true, nil
}
}
return nil, mean, true, nil
}
return avgOverTime(f.Samples), nil, true, nil

v := avgOverTime(f.Samples)
return &v, nil, true, nil
},
"stddev_over_time": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) {
if len(f.Samples) == 0 {
Expand Down Expand Up @@ -150,7 +191,14 @@ var rangeVectorFuncs = map[string]FunctionCall{
if len(f.Samples) < 2 {
return nil, nil, false, nil
}
return deriv(f.Samples), nil, true, nil

if f.Samples[0].V.H != nil {
// deriv should ignore histograms.
return nil, nil, false, nil
}

v := deriv(f.Samples)
return &v, nil, true, nil
},
"irate": func(f FunctionArgs) (*float64, *histogram.FloatHistogram, bool, error) {
f.Samples = filterFloatOnlySamples(f.Samples)
Expand Down Expand Up @@ -459,15 +507,28 @@ func histogramRate(ctx context.Context, points []Sample, isCounter bool) (*histo
}

prev := points[0].V.H // We already know that this is a histogram.
usingCustomBuckets := prev.UsesCustomBuckets()
last := points[len(points)-1].V.H
if last == nil {
warnings.AddToContext(annotations.MixedFloatsHistogramsWarning, ctx)
return nil, nil // Range contains a mix of histograms and floats.
}
minSchema := prev.Schema
if last.Schema < minSchema {
minSchema = last.Schema
}

if last.UsesCustomBuckets() != usingCustomBuckets {
warnings.AddToContext(annotations.MixedExponentialCustomHistogramsWarning, ctx)
return nil, nil
}

// We check for gauge type histograms in the loop below, but the loop below does not run on the first and last point,
// so check the first and last point now.
if isCounter && (prev.CounterResetHint == histogram.GaugeType || last.CounterResetHint == histogram.GaugeType) {
warnings.AddToContext(annotations.NativeHistogramNotCounterWarning, ctx)
}

// https://github.com/prometheus/prometheus/blob/ccea61c7bf1e6bce2196ba8189a209945a204c5b/promql/functions.go#L183
// First iteration to find out two things:
// - What's the smallest relevant schema?
Expand All @@ -476,18 +537,31 @@ func histogramRate(ctx context.Context, points []Sample, isCounter bool) (*histo
for _, currPoint := range points[1 : len(points)-1] {
curr := currPoint.V.H
if curr == nil {
warnings.AddToContext(annotations.MixedFloatsHistogramsWarning, ctx)
return nil, nil // Range contains a mix of histograms and floats.
}
if !isCounter {
continue
}
if curr.CounterResetHint == histogram.GaugeType {
warnings.AddToContext(annotations.NativeHistogramNotCounterWarning, ctx)
}
if curr.Schema < minSchema {
minSchema = curr.Schema
}
if curr.UsesCustomBuckets() != usingCustomBuckets {
warnings.AddToContext(annotations.MixedExponentialCustomHistogramsWarning, ctx)
return nil, nil
}
}

h := last.CopyToSchema(minSchema)
if _, err := h.Sub(prev); err != nil {
if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) {
warnings.AddToContext(annotations.MixedExponentialCustomHistogramsWarning, ctx)
} else if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) {
warnings.AddToContext(annotations.IncompatibleCustomBucketsHistogramsWarning, ctx)
}
return nil, err
}

Expand All @@ -497,12 +571,20 @@ func histogramRate(ctx context.Context, points []Sample, isCounter bool) (*histo
curr := currPoint.V.H
if curr.DetectReset(prev) {
if _, err := h.Add(prev); err != nil {
if errors.Is(err, histogram.ErrHistogramsIncompatibleSchema) {
warnings.AddToContext(annotations.MixedExponentialCustomHistogramsWarning, ctx)
} else if errors.Is(err, histogram.ErrHistogramsIncompatibleBounds) {
warnings.AddToContext(annotations.IncompatibleCustomBucketsHistogramsWarning, ctx)
}
return nil, err
}
}
prev = curr
}
} else if points[0].V.H.CounterResetHint != histogram.GaugeType || points[len(points)-1].V.H.CounterResetHint != histogram.GaugeType {
warnings.AddToContext(annotations.NativeHistogramNotGaugeWarning, ctx)
}

h.CounterResetHint = histogram.GaugeType
return h.Compact(0), nil
}
Expand Down

0 comments on commit 1c3604e

Please sign in to comment.