Skip to content

Commit

Permalink
Update Prometheus
Browse files Browse the repository at this point in the history
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
  • Loading branch information
harry671003 committed Nov 15, 2024
1 parent 097e6e9 commit 8df4db9
Show file tree
Hide file tree
Showing 18 changed files with 619 additions and 250 deletions.
169 changes: 99 additions & 70 deletions engine/engine_test.go

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions engine/enginefuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type testCase struct {
query string
loads []string
oldRes, newRes *promql.Result
start time.Time
end time.Time
step time.Duration
}

func FuzzEnginePromQLSmithRangeQuery(f *testing.F) {
Expand Down Expand Up @@ -116,6 +119,9 @@ func FuzzEnginePromQLSmithRangeQuery(f *testing.F) {
newRes: newResult,
oldRes: oldResult,
loads: []string{load},
start: start,
end: end,
step: interval,
}
}
validateTestCases(t, cases)
Expand Down Expand Up @@ -198,6 +204,7 @@ func FuzzEnginePromQLSmithInstantQuery(f *testing.F) {
newRes: newResult,
oldRes: oldResult,
loads: []string{load},
start: queryTime,
}
}
validateTestCases(t, cases)
Expand Down Expand Up @@ -314,6 +321,9 @@ func FuzzDistributedEnginePromQLSmithRangeQuery(f *testing.F) {
newRes: newResult,
oldRes: oldResult,
loads: []string{load, load2},
start: start,
end: end,
step: interval,
}
}
validateTestCases(t, cases)
Expand Down Expand Up @@ -419,6 +429,7 @@ func FuzzDistributedEnginePromQLSmithInstantQuery(f *testing.F) {
newRes: newResult,
oldRes: oldResult,
loads: []string{load, load2},
start: queryTime,
}
}
validateTestCases(t, cases)
Expand Down Expand Up @@ -450,6 +461,7 @@ func validateTestCases(t *testing.T, cases []*testCase) {
t.Logf(load)
}
t.Logf(c.query)
t.Logf("start: %v, end: %v, step: %v\n", c.start.Unix(), c.end.Unix(), c.step.Seconds())

t.Logf("case %d error mismatch.\nnew result: %s\nold result: %s\n", i, c.newRes.String(), c.oldRes.String())
failures++
Expand Down
22 changes: 18 additions & 4 deletions engine/existing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ func TestRangeQuery(t *testing.T) {
Load: `load 30s
bar 0 1 10 100 1000`,
Query: `sum_over_time(bar[30s])`,
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 10, T: 60000}, {F: 1000, T: 120000}},
Metric: labels.Labels{},
},
},
Start: time.Unix(0, 0),
End: time.Unix(120, 0),
Interval: 60 * time.Second,
},
{
Name: "sum_over_time with all values",
Load: `load 30s
bar 0 1 10 100 1000`,
Query: `sum_over_time(bar[45s])`,
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}},
Expand All @@ -46,7 +61,7 @@ func TestRangeQuery(t *testing.T) {
Name: "sum_over_time with trailing values",
Load: `load 30s
bar 0 1 10 100 1000 0 0 0 0`,
Query: `sum_over_time(bar[30s])`,
Query: `sum_over_time(bar[45s])`,
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}},
Expand All @@ -61,7 +76,7 @@ func TestRangeQuery(t *testing.T) {
Name: "sum_over_time with all values long",
Load: `load 30s
bar 0 1 10 100 1000 10000 100000 1000000 10000000`,
Query: `sum_over_time(bar[30s])`,
Query: `sum_over_time(bar[45s])`,
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 0, T: 0}, {F: 11, T: 60000}, {F: 1100, T: 120000}, {F: 110000, T: 180000}, {F: 11000000, T: 240000}},
Expand All @@ -76,7 +91,7 @@ func TestRangeQuery(t *testing.T) {
Name: "sum_over_time with all values random",
Load: `load 30s
bar 5 17 42 2 7 905 51`,
Query: `sum_over_time(bar[30s])`,
Query: `sum_over_time(bar[45s])`,
Result: promql.Matrix{
promql.Series{
Floats: []promql.FPoint{{F: 5, T: 0}, {F: 59, T: 60000}, {F: 9, T: 120000}, {F: 956, T: 180000}},
Expand Down Expand Up @@ -123,7 +138,6 @@ func TestRangeQuery(t *testing.T) {
Timeout: 1 * time.Hour,
}
ng := engine.New(engine.Opts{EngineOpts: opts})

for _, c := range cases {
c := c
t.Run(c.Name, func(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions engine/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,9 @@ func TestAnalyzeOutputNode_Samples(t *testing.T) {
| | |---[duplicateLabelCheck]: 0 peak: 0
| | | |---[coalesce]: 0 peak: 0
| | | | |---[concurrent(buff=2)]: 0 peak: 0
| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 0 mod 2): 1061 peak: 21
| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 0 mod 2): 1010 peak: 20
| | | | |---[concurrent(buff=2)]: 0 peak: 0
| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 1 mod 2): 1061 peak: 21
| | | | | |---[matrixSelector] rate({[__name__="http_requests_total"]}[10m0s] 1 mod 2): 1010 peak: 20
`
require.EqualValues(t, expected, result)
}
Expand Down
7 changes: 6 additions & 1 deletion engine/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,12 @@ func (s sortByLabelFuncResult) comparer(samples *promql.Vector) func(i, j int) b
return s.sortOrder == sortOrderDesc
}
}
return valueCompare(s.sortOrder, (*samples)[i].F, (*samples)[j].F)
// If all labels provided as arguments were equal, sort by the full label set. This ensures a consistent ordering.
if lblsCmp := labels.Compare(iLb.Labels(), jLb.Labels()); lblsCmp < 0 {
return s.sortOrder == sortOrderAsc
} else {
return s.sortOrder == sortOrderDesc
}
}
}

Expand Down
70 changes: 58 additions & 12 deletions execution/aggregate/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,12 @@ func (c *countAcc) Reset(_ float64) {
}

type avgAcc struct {
avg float64
count int64
hasValue bool
kahanSum float64
kahanC float64
avg float64
incremental bool
count int64
hasValue bool

histSum *histogram.FloatHistogram
histScratch *histogram.FloatHistogram
Expand Down Expand Up @@ -326,30 +329,55 @@ func (a *avgAcc) Add(v float64, h *histogram.FloatHistogram) error {
a.count++
if !a.hasValue {
a.hasValue = true
a.avg = v
a.kahanSum = v
return nil
}

a.hasValue = true

if !a.incremental {
newSum, newC := kahanSumInc(v, a.kahanSum, a.kahanC)

if !math.IsInf(newSum, 0) {
// The sum doesn't overflow, so we propagate it to the
// group struct and continue with the regular
// calculation of the mean value.
a.kahanSum, a.kahanC = newSum, newC
return nil
}

// If we are here, we know that the sum _would_ overflow. So
// instead of continue to sum up, we revert to incremental
// calculation of the mean value from here on.
a.incremental = true
a.avg = a.kahanSum / float64(a.count-1)
a.kahanC /= float64(a.count) - 1
}

if math.IsInf(a.avg, 0) {
if math.IsInf(v, 0) && (a.avg > 0) == (v > 0) {
// The `avg` and `v` values are `Inf` of the same sign. They
// can't be subtracted, but the value of `avg` is correct
// The `floatMean` and `s.F` values are `Inf` of the same sign. They
// can't be subtracted, but the value of `floatMean` is correct
// already.
return nil
}
if !math.IsInf(v, 0) && !math.IsNaN(v) {
// At this stage, the avg is an infinite. If the added
// value is neither an Inf or a Nan, we can keep that avg
// At this stage, the mean is an infinite. If the added
// value is neither an Inf or a Nan, we can keep that mean
// value.
// This is required because our calculation below removes
// the avg value, which would look like Inf += x - Inf and
// the mean value, which would look like Inf += x - Inf and
// end up as a NaN.
return nil
}
}

a.avg += v/float64(a.count) - a.avg/float64(a.count)
currentMean := a.avg + a.kahanC
a.avg, a.kahanC = kahanSumInc(
// Divide each side of the `-` by `group.groupCount` to avoid float64 overflows.
v/float64(a.count)-currentMean/float64(a.count),
a.avg,
a.kahanC,
)
return nil
}

Expand All @@ -368,7 +396,10 @@ func (a *avgAcc) AddVector(vs []float64, hs []*histogram.FloatHistogram) error {
}

func (a *avgAcc) Value() (float64, *histogram.FloatHistogram) {
return a.avg, a.histSum
if a.incremental {
return a.avg + a.kahanC, a.histSum
}
return (a.kahanSum + a.kahanC) / float64(a.count), a.histSum
}

func (a *avgAcc) ValueType() ValueType {
Expand Down Expand Up @@ -562,3 +593,18 @@ func SumCompensated(s []float64) float64 {
}
return sum + c
}

func kahanSumInc(inc, sum, c float64) (newSum, newC float64) {
t := sum + inc
switch {
case math.IsInf(t, 0):
c = 0

// Using Neumaier improvement, swap if next term larger than sum.
case math.Abs(sum) >= math.Abs(inc):
c += (sum - t) + inc
default:
c += (inc - t) + sum
}
return t, c
}
10 changes: 6 additions & 4 deletions execution/aggregate/count_values.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,21 +160,23 @@ func (c *countValuesOperator) initSeriesOnce(ctx context.Context) error {
}
for i := range in {
ts = append(ts, in[i].T)
countPerHashbucket := make(map[uint64]map[float64]int, len(inputIdToHashBucket))
countPerHashbucket := make(map[uint64]map[string]int, len(inputIdToHashBucket))
for j := range in[i].Samples {
hash := inputIdToHashBucket[int(in[i].SampleIDs[j])]
if _, ok := countPerHashbucket[hash]; !ok {
countPerHashbucket[hash] = make(map[float64]int)
countPerHashbucket[hash] = make(map[string]int)
}
countPerHashbucket[hash][in[i].Samples[j]]++
// Using string as the key to the map so that -0 and 0 are treated as separate values.
fStr := strconv.FormatFloat(in[i].Samples[j], 'f', -1, 64)
countPerHashbucket[hash][fStr]++
}

countsPerOutputId := make(map[int]int)
for hash, counts := range countPerHashbucket {
b.Reset(hashToBucketLabels[hash])
for f, count := range counts {
// TODO: Probably we should issue a warning if we override a label here
lbls := b.Set(c.param, strconv.FormatFloat(f, 'f', -1, 64)).Labels()
lbls := b.Set(c.param, f).Labels()
hash := lbls.Hash()
outputId, ok := hashToOutputId[hash]
if !ok {
Expand Down
4 changes: 2 additions & 2 deletions execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,9 @@ func getTimeRangesForVectorSelector(n *logicalplan.VectorSelector, opts *query.O
end = *n.Timestamp
}
if evalRange == 0 {
start -= opts.LookbackDelta.Milliseconds()
start -= opts.LookbackDelta.Milliseconds() - 1
} else {
start -= evalRange
start -= evalRange - 1
}
offset := n.OriginalOffset.Milliseconds()
return start - offset, end - offset
Expand Down
Loading

0 comments on commit 8df4db9

Please sign in to comment.