Skip to content

Commit

Permalink
Dev measure aggregate function (#528)
Browse files Browse the repository at this point in the history
* MeasureAggregateFunctionService Support


---------

Co-authored-by: 吴晟 Wu Sheng <[email protected]>
Co-authored-by: Gao Hongtao <[email protected]>
  • Loading branch information
3 people authored Oct 14, 2024
1 parent a8cee7f commit 25c1969
Show file tree
Hide file tree
Showing 15 changed files with 616 additions and 51 deletions.
39 changes: 32 additions & 7 deletions banyand/measure/aggregate/aggregate_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,20 @@ import (
)

// Void type contains nothing. It works as a placeholder for type parameters of `Arguments`.
type Void struct{}
// It's implemented as int64, but it won't be used as an int64.
type Void int64

// Input covers possible types of Function's arguments. It synchronizes with `FieldType` in schema.
// Input covers possible types of Function's arguments. It synchronizes with `FieldType`.
// It also covers Void type.
type Input interface {
Void | ~int64 | ~float64
~int64 | ~float64
}

// Output covers possible types of Function's return value.
type Output interface {
~int64 | ~float64
}

var errFieldValueType = fmt.Errorf("unsupported input value type on this field")

// Arguments represents the argument array, with one argument or two arguments.
type Arguments[A, B Input] struct {
arg0 []A
Expand All @@ -52,8 +52,9 @@ type Function[A, B Input, R Output] interface {
// It uses a two-dimensional array to represent the argument array.
Combine(arguments Arguments[A, B]) error

// Result gives the result for the aggregation.
Result() R
// Result gives the result for the aggregation. R is the aggregating result,
// A is the first aggregating state, and B is the second aggregating state.
Result() (A, B, R)
}

// NewFunction constructs the aggregate function with given kind and parameter types.
Expand All @@ -62,8 +63,18 @@ func NewFunction[A, B Input, R Output](kind modelv1.MeasureAggregate) (Function[
switch kind {
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN:
function = &Min[A, B, R]{minimum: maxValue[R]()}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX:
function = &Max[A, B, R]{maximum: minValue[R]()}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_COUNT:
function = &Count[A, B, R]{count: 0}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_SUM:
function = &Sum[A, B, R]{summation: zeroValue[R]()}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG:
function = &Avg[A, B, R]{summation: zeroValue[R](), count: 0}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_PERCENT:
function = &Percent[A, B, R]{total: 0, match: 0}
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_RATE:
function = &Rate[A, B, R]{denominator: 0, numerator: 0}
default:
return nil, fmt.Errorf("MeasureAggregate unknown")
}
Expand All @@ -76,6 +87,20 @@ func zeroValue[R Output]() R {
return r
}

func minValue[R Output]() (r R) {
switch a := any(&r).(type) {
case *int64:
*a = math.MinInt64
case *float64:
*a = -math.MaxFloat64
case *string:
*a = ""
default:
panic("unreachable")
}
return
}

func maxValue[R Output]() (r R) {
switch a := any(&r).(type) {
case *int64:
Expand Down
54 changes: 29 additions & 25 deletions banyand/measure/aggregate/avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,40 +24,44 @@ type Avg[A, B Input, R Output] struct {
}

// Combine takes elements to do the aggregation.
// Avg uses type parameter A and B.
func (a *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg0 := range arguments.arg0 {
switch arg0 := any(arg0).(type) {
case int64:
a.summation += R(arg0)
case float64:
a.summation += R(arg0)
default:
return errFieldValueType
}
// Avg uses type parameter A.
func (f *Avg[A, B, R]) Combine(arguments Arguments[A, B]) error {
i := 0
n := len(arguments.arg0)
// step-4 aggregate
for ; i <= n-4; i += 4 {
f.summation += R(arguments.arg0[i]) + R(arguments.arg0[i+1]) +
R(arguments.arg0[i+2]) + R(arguments.arg0[i+3])
}
// tail aggregate
for ; i < n; i++ {
f.summation += R(arguments.arg0[i])
}

for _, arg1 := range arguments.arg1 {
switch arg1 := any(arg1).(type) {
case int64:
a.count += arg1
default:
return errFieldValueType
}
i = 0
n = len(arguments.arg1)
// step-4 aggregate
for ; i <= n-4; i += 4 {
f.count += int64(arguments.arg1[i]) + int64(arguments.arg1[i+1]) +
int64(arguments.arg1[i+2]) + int64(arguments.arg1[i+3])
}
// tail aggregate
for ; i < n; i++ {
f.count += int64(arguments.arg1[i])
}

return nil
}

// Result gives the result for the aggregation.
func (a *Avg[A, B, R]) Result() R {
// In unusual situations it returns the zero value.
if a.count == 0 {
return zeroValue[R]()
func (f *Avg[A, B, R]) Result() (A, B, R) {
var average R
if f.count != 0 {
// According to the semantics of GoLang, the division of one int by another int
// returns an int, instead of f float.
average = f.summation / R(f.count)
}
// According to the semantics of GoLang, the division of one int by another int
// returns an int, instead of a float.
return a.summation / R(a.count)
return A(f.summation), B(f.count), average
}

// NewAvgArguments constructs arguments.
Expand Down
12 changes: 9 additions & 3 deletions banyand/measure/aggregate/avg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ func TestAvg(t *testing.T) {
// case1: input int64 values
avgInt64, _ := aggregate.NewFunction[int64, int64, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG)
err = avgInt64.Combine(aggregate.NewAvgArguments[int64](
[]int64{1, 2, 3}, // mock the "summation" column
[]int64{1, 3, 3}, // mock the "summation" column
[]int64{1, 1, 1}, // mock the "count" column
))
assert.NoError(t, err)
assert.Equal(t, int64(2), avgInt64.Result()) // note that 7/3 becomes 2 as int
a1, b1, r1 := avgInt64.Result()
assert.Equal(t, int64(7), a1)
assert.Equal(t, int64(3), b1)
assert.Equal(t, int64(2), r1) // note that 7/3 becomes 2 as int

// case2: input float64 elements
avgFloat64, _ := aggregate.NewFunction[float64, int64, float64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_AVG)
Expand All @@ -45,5 +48,8 @@ func TestAvg(t *testing.T) {
[]int64{1, 1, 1}, // mock the "count" column
))
assert.NoError(t, err)
assert.Equal(t, 7.0/3, avgFloat64.Result())
a2, b2, r2 := avgFloat64.Result()
assert.Equal(t, 7.0, a2)
assert.Equal(t, int64(3), b2)
assert.Equal(t, 7.0/3, r2)
}
53 changes: 53 additions & 0 deletions banyand/measure/aggregate/count.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate

// Count calculates the count value of elements.
type Count[A, B Input, R Output] struct {
count int64
}

// Combine takes elements to do the aggregation.
// Count uses none of type parameters.
func (f *Count[A, B, R]) Combine(arguments Arguments[A, B]) error {
i := 0
n := len(arguments.arg0)
// step-4 aggregate
for ; i <= n-4; i += 4 {
f.count += int64(arguments.arg0[i]) + int64(arguments.arg0[i+1]) +
int64(arguments.arg0[i+2]) + int64(arguments.arg0[i+3])
}
// tail aggregate
for ; i < n; i++ {
f.count += int64(arguments.arg0[i])
}
return nil
}

// Result gives the result for the aggregation.
func (f *Count[A, B, R]) Result() (A, B, R) {
return A(f.count), zeroValue[B](), R(f.count)
}

// NewCountArguments constructs arguments.
func NewCountArguments(a []int64) Arguments[int64, Void] {
return Arguments[int64, Void]{
arg0: a,
arg1: nil,
}
}
40 changes: 40 additions & 0 deletions banyand/measure/aggregate/count_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate_test

import (
"testing"

"github.com/stretchr/testify/assert"

modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure/aggregate"
)

func TestCount(t *testing.T) {
var err error

// case1: input int64 values
countInt64, _ := aggregate.NewFunction[int64, aggregate.Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_COUNT)
err = countInt64.Combine(aggregate.NewCountArguments(
[]int64{1, 2, 3}, // mock the "count" column
))
assert.NoError(t, err)
_, _, r1 := countInt64.Result()
assert.Equal(t, int64(6), r1)
}
47 changes: 47 additions & 0 deletions banyand/measure/aggregate/max.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate

// Max calculates the maximum value of elements.
type Max[A, B Input, R Output] struct {
maximum R
}

// Combine takes elements to do the aggregation.
// Max uses type parameter A.
func (f *Max[A, B, R]) Combine(arguments Arguments[A, B]) error {
for _, arg0 := range arguments.arg0 {
if R(arg0) > f.maximum {
f.maximum = R(arg0)
}
}
return nil
}

// Result gives the result for the aggregation.
func (f *Max[A, B, R]) Result() (A, B, R) {
return A(f.maximum), zeroValue[B](), f.maximum
}

// NewMaxArguments constructs arguments.
func NewMaxArguments[A Input](a []A) Arguments[A, Void] {
return Arguments[A, Void]{
arg0: a,
arg1: nil,
}
}
49 changes: 49 additions & 0 deletions banyand/measure/aggregate/max_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package aggregate_test

import (
"testing"

"github.com/stretchr/testify/assert"

modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure/aggregate"
)

func TestMax(t *testing.T) {
var err error

// case1: input int64 values
maxInt64, _ := aggregate.NewFunction[int64, aggregate.Void, int64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX)
err = maxInt64.Combine(aggregate.NewMaxArguments[int64](
[]int64{1, 2, 3}, // mock the "maximum" column
))
assert.NoError(t, err)
_, _, r1 := maxInt64.Result()
assert.Equal(t, int64(3), r1)

// case2: input float64 values
maxFloat64, _ := aggregate.NewFunction[float64, aggregate.Void, float64](modelv1.MeasureAggregate_MEASURE_AGGREGATE_MAX)
err = maxFloat64.Combine(aggregate.NewMaxArguments[float64](
[]float64{1.0, 2.0, 3.0}, // mock the "maximum" column
))
assert.NoError(t, err)
_, _, r2 := maxFloat64.Result()
assert.Equal(t, 3.0, r2)
}
Loading

0 comments on commit 25c1969

Please sign in to comment.