diff --git a/execution/aggregate/accumulator.go b/execution/aggregate/accumulator.go new file mode 100644 index 00000000..2619f7d1 --- /dev/null +++ b/execution/aggregate/accumulator.go @@ -0,0 +1,258 @@ +package aggregate + +import ( + "math" + + "github.com/prometheus/prometheus/model/histogram" +) + +type accumulator interface { + Add(v float64, h *histogram.FloatHistogram) + Value() (float64, *histogram.FloatHistogram) + HasValue() bool + Reset(float64) +} + +type newAccumulatorFunc func() accumulator + +type sumAcc struct { + value float64 + histSum *histogram.FloatHistogram + hasFloatVal bool +} + +func newSumAcc() accumulator { + return &sumAcc{} +} + +func (s *sumAcc) Add(v float64, h *histogram.FloatHistogram) { + if h == nil { + s.hasFloatVal = true + s.value += v + return + } + if s.histSum == nil { + s.histSum = h.Copy() + return + } + // The histogram being added must have an equal or larger schema. + // https://github.com/prometheus/prometheus/blob/57bcbf18880f7554ae34c5b341d52fc53f059a97/promql/engine.go#L2448-L2456 + if h.Schema >= s.histSum.Schema { + s.histSum = s.histSum.Add(h) + } else { + t := h.Copy() + t.Add(s.histSum) + s.histSum = t + } +} + +func (s *sumAcc) Value() (float64, *histogram.FloatHistogram) { + return s.value, s.histSum +} + +// HasValue for sum returns an empty result when floats are histograms are aggregated. +func (s *sumAcc) HasValue() bool { + return s.hasFloatVal != (s.histSum != nil) +} + +func (s *sumAcc) Reset(_ float64) { + s.histSum = nil + s.hasFloatVal = false + s.value = 0 +} + +type genericAcc struct { + value float64 + hasValue bool + aggregate func(float64, float64) float64 +} + +func maxAggregate(a, b float64) float64 { + if a > b { + return a + } + return b +} +func minAggregate(a, b float64) float64 { + if a < b { + return a + } + return b +} +func groupAggregate(_, _ float64) float64 { return 1 } + +func newMaxAcc() accumulator { + return &genericAcc{aggregate: maxAggregate} +} + +func newMinAcc() accumulator { + return &genericAcc{aggregate: minAggregate} +} + +func newCountAcc() accumulator { + return &countAcc{} +} + +func newGroupAcc() accumulator { + return &genericAcc{aggregate: groupAggregate} +} + +func (g *genericAcc) Add(v float64, _ *histogram.FloatHistogram) { + if !g.hasValue || math.IsNaN(g.value) { + g.value = v + } + g.hasValue = true + g.value = g.aggregate(g.value, v) +} + +func (g *genericAcc) Value() (float64, *histogram.FloatHistogram) { + return g.value, nil +} + +func (g *genericAcc) HasValue() bool { + return g.hasValue +} + +func (g *genericAcc) Reset(_ float64) { + g.hasValue = false + g.value = 0 +} + +type countAcc struct { + value float64 + hasValue bool +} + +func (c *countAcc) Add(v float64, h *histogram.FloatHistogram) { + c.hasValue = true + c.value += 1 +} + +func (c *countAcc) Value() (float64, *histogram.FloatHistogram) { + return c.value, nil +} + +func (c *countAcc) HasValue() bool { + return c.hasValue +} + +func (c *countAcc) Reset(_ float64) { + c.hasValue = false + c.value = 0 +} + +type avgAcc struct { + count float64 + sum float64 + hasValue bool +} + +func newAvgAcc() accumulator { + return &avgAcc{} +} + +func (a *avgAcc) Add(v float64, h *histogram.FloatHistogram) { + a.hasValue = true + a.count += 1 + a.sum += v +} + +func (a *avgAcc) Value() (float64, *histogram.FloatHistogram) { + return a.sum / a.count, nil +} + +func (a *avgAcc) HasValue() bool { + return a.hasValue +} + +func (a *avgAcc) Reset(_ float64) { + a.hasValue = false + a.sum = 0 + a.count = 0 +} + +type statAcc struct { + count float64 + mean float64 + value float64 + hasValue bool +} + +func (s *statAcc) Add(v float64, h *histogram.FloatHistogram) { + s.hasValue = true + s.count++ + + delta := v - s.mean + s.mean += delta / s.count + s.value += delta * (v - s.mean) +} + +func (s *statAcc) HasValue() bool { + return s.hasValue +} + +func (s *statAcc) Reset(_ float64) { + s.hasValue = false + s.count = 0 + s.mean = 0 + s.value = 0 +} + +type stdDevAcc struct { + statAcc +} + +func newStdDevAcc() accumulator { + return &stdDevAcc{} +} + +func (s *stdDevAcc) Value() (float64, *histogram.FloatHistogram) { + if s.count == 1 { + return 0, nil + } + return math.Sqrt(s.value / s.count), nil +} + +type stdVarAcc struct { + statAcc +} + +func newStdVarAcc() accumulator { + return &stdVarAcc{} +} + +func (s *stdVarAcc) Value() (float64, *histogram.FloatHistogram) { + if s.count == 1 { + return 0, nil + } + return s.value / s.count, nil +} + +type quantileAcc struct { + arg float64 + points []float64 + hasValue bool +} + +func newQuantileAcc() accumulator { + return &quantileAcc{} +} + +func (q *quantileAcc) Add(v float64, h *histogram.FloatHistogram) { + q.hasValue = true + q.points = append(q.points, v) +} + +func (q *quantileAcc) Value() (float64, *histogram.FloatHistogram) { + return quantile(q.arg, q.points), nil +} + +func (q *quantileAcc) HasValue() bool { + return q.hasValue +} + +func (q *quantileAcc) Reset(f float64) { + q.hasValue = false + q.arg = f + q.points = q.points[:0] +} diff --git a/execution/aggregate/scalar_table.go b/execution/aggregate/scalar_table.go index 98c232fb..8b556d8c 100644 --- a/execution/aggregate/scalar_table.go +++ b/execution/aggregate/scalar_table.go @@ -28,7 +28,7 @@ type scalarTable struct { timestamp int64 inputs []uint64 outputs []*model.Series - accumulators []*accumulator + accumulators []accumulator } func newScalarTables(stepsBatch int, inputCache []uint64, outputCache []*model.Series, newAccumulator newAccumulatorFunc) []aggregateTable { @@ -40,7 +40,7 @@ func newScalarTables(stepsBatch int, inputCache []uint64, outputCache []*model.S } func newScalarTable(inputSampleIDs []uint64, outputs []*model.Series, newAccumulator newAccumulatorFunc) *scalarTable { - accumulators := make([]*accumulator, len(outputs)) + accumulators := make([]accumulator, len(outputs)) for i := 0; i < len(accumulators); i++ { accumulators[i] = newAccumulator() } @@ -67,14 +67,14 @@ func (t *scalarTable) addSample(sampleID uint64, sample float64) { outputSampleID := t.inputs[sampleID] output := t.outputs[outputSampleID] - t.accumulators[output.ID].AddFunc(sample, nil) + t.accumulators[output.ID].Add(sample, nil) } func (t *scalarTable) addHistogram(sampleID uint64, h *histogram.FloatHistogram) { outputSampleID := t.inputs[sampleID] output := t.outputs[outputSampleID] - t.accumulators[output.ID].AddFunc(0, h) + t.accumulators[output.ID].Add(0, h) } func (t *scalarTable) reset(arg float64) { @@ -87,7 +87,7 @@ func (t *scalarTable) toVector(pool *model.VectorPool) model.StepVector { result := pool.GetStepVector(t.timestamp) for i, v := range t.outputs { if t.accumulators[i].HasValue() { - f, h := t.accumulators[i].ValueFunc() + f, h := t.accumulators[i].Value() if h == nil { result.AppendSample(pool, v.ID, f) } else { @@ -141,239 +141,27 @@ func hashMetric( return key, string(bytes), builder.Labels() } -type newAccumulatorFunc func() *accumulator - -type accumulator struct { - AddFunc func(v float64, h *histogram.FloatHistogram) - ValueFunc func() (float64, *histogram.FloatHistogram) - HasValue func() bool - Reset func(arg float64) -} - func makeAccumulatorFunc(expr parser.ItemType) (newAccumulatorFunc, error) { t := parser.ItemTypeStr[expr] switch t { case "sum": - return func() *accumulator { - var value float64 - var histSum *histogram.FloatHistogram - var hasFloatVal bool - - return &accumulator{ - AddFunc: func(v float64, h *histogram.FloatHistogram) { - if h == nil { - hasFloatVal = true - value += v - return - } - if histSum == nil { - histSum = h.Copy() - return - } - // The histogram being added must have an equal or larger schema. - // https://github.com/prometheus/prometheus/blob/57bcbf18880f7554ae34c5b341d52fc53f059a97/promql/engine.go#L2448-L2456 - if h.Schema >= histSum.Schema { - histSum = histSum.Add(h) - } else { - t := h.Copy() - t.Add(histSum) - histSum = t - } - - }, - ValueFunc: func() (float64, *histogram.FloatHistogram) { - return value, histSum - }, - // Sum returns an empty result when floats are histograms are aggregated. - HasValue: func() bool { return hasFloatVal != (histSum != nil) }, - Reset: func(_ float64) { - histSum = nil - hasFloatVal = false - value = 0 - }, - } - }, nil + return newSumAcc, nil case "max": - return func() *accumulator { - var value float64 - var hasValue bool - - return &accumulator{ - AddFunc: func(v float64, _ *histogram.FloatHistogram) { - if !hasValue || math.IsNaN(value) || value < v { - value = v - } - hasValue = true - }, - ValueFunc: func() (float64, *histogram.FloatHistogram) { - return value, nil - }, - HasValue: func() bool { return hasValue }, - Reset: func(_ float64) { - hasValue = false - value = 0 - }, - } - }, nil + return newMaxAcc, nil case "min": - return func() *accumulator { - var value float64 - var hasValue bool - - return &accumulator{ - AddFunc: func(v float64, _ *histogram.FloatHistogram) { - if !hasValue || math.IsNaN(value) || value > v { - value = v - } - hasValue = true - }, - ValueFunc: func() (float64, *histogram.FloatHistogram) { - return value, nil - }, - HasValue: func() bool { return hasValue }, - Reset: func(_ float64) { - hasValue = false - value = 0 - }, - } - }, nil + return newMinAcc, nil case "count": - return func() *accumulator { - var value float64 - var hasValue bool - - return &accumulator{ - AddFunc: func(_ float64, _ *histogram.FloatHistogram) { - hasValue = true - value += 1 - }, - ValueFunc: func() (float64, *histogram.FloatHistogram) { - return value, nil - }, - HasValue: func() bool { return hasValue }, - Reset: func(_ float64) { - hasValue = false - value = 0 - }, - } - }, nil + return newCountAcc, nil case "avg": - return func() *accumulator { - var count, sum float64 - var hasValue bool - - return &accumulator{ - AddFunc: func(v float64, _ *histogram.FloatHistogram) { - hasValue = true - count += 1 - sum += v - }, - ValueFunc: func() (float64, *histogram.FloatHistogram) { - return sum / count, nil - }, - HasValue: func() bool { return hasValue }, - Reset: func(_ float64) { - hasValue = false - sum = 0 - count = 0 - }, - } - }, nil + return newAvgAcc, nil case "group": - return func() *accumulator { - var hasValue bool - return &accumulator{ - AddFunc: func(_ float64, _ *histogram.FloatHistogram) { - hasValue = true - }, - ValueFunc: func() (float64, *histogram.FloatHistogram) { - return 1, nil - }, - HasValue: func() bool { return hasValue }, - Reset: func(_ float64) { - hasValue = false - }, - } - }, nil + return newGroupAcc, nil case "stddev": - return func() *accumulator { - var count float64 - var mean float64 - var value float64 - var hasValue bool - return &accumulator{ - AddFunc: func(v float64, _ *histogram.FloatHistogram) { - hasValue = true - count++ - delta := v - mean - mean += delta / count - value += delta * (v - mean) - }, - ValueFunc: func() (float64, *histogram.FloatHistogram) { - if count == 1 { - return 0, nil - } - return math.Sqrt(value / count), nil - }, - HasValue: func() bool { return hasValue }, - Reset: func(_ float64) { - hasValue = false - count = 0 - mean = 0 - value = 0 - }, - } - }, nil + return newStdDevAcc, nil case "stdvar": - return func() *accumulator { - var count float64 - var mean float64 - var value float64 - var hasValue bool - return &accumulator{ - AddFunc: func(v float64, _ *histogram.FloatHistogram) { - hasValue = true - count++ - delta := v - mean - mean += delta / count - value += delta * (v - mean) - }, - ValueFunc: func() (float64, *histogram.FloatHistogram) { - if count == 1 { - return 0, nil - } - return value / count, nil - }, - HasValue: func() bool { return hasValue }, - Reset: func(_ float64) { - hasValue = false - count = 0 - mean = 0 - value = 0 - }, - } - }, nil + return newStdVarAcc, nil case "quantile": - return func() *accumulator { - var hasValue bool - var arg float64 - points := make([]float64, 0) - return &accumulator{ - AddFunc: func(v float64, _ *histogram.FloatHistogram) { - hasValue = true - points = append(points, v) - }, - ValueFunc: func() (float64, *histogram.FloatHistogram) { - return quantile(arg, points), nil - }, - HasValue: func() bool { return hasValue }, - Reset: func(a float64) { - hasValue = false - arg = a - points = points[:0] - }, - } - }, nil + return newQuantileAcc, nil } msg := fmt.Sprintf("unknown aggregation function %s", t) return nil, errors.Wrap(parse.ErrNotSupportedExpr, msg)