Skip to content

Commit

Permalink
make the lint
Browse files Browse the repository at this point in the history
  • Loading branch information
sumingfirst committed Sep 25, 2024
1 parent ef79c3e commit 3335bb9
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 43 deletions.
2 changes: 2 additions & 0 deletions banyand/measure/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)

// PREFIX is used to identify delta fields.
const PREFIX = "_"

type block struct {
Expand Down Expand Up @@ -134,6 +135,7 @@ func containsDelta(types []pbv1.DataPointValueType) (hasDelta bool, allDelta boo
}
return deltaCount > 0, deltaCount == len(types)
}

func (b *block) processTagFamilies(tff []nameValues, i int, dataPointsLen int) {
tagFamilies := b.resizeTagFamilies(len(tff))
for j, tf := range tff {
Expand Down
6 changes: 4 additions & 2 deletions banyand/measure/block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package measure

import (
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"path/filepath"

"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/pool"
)

Expand Down Expand Up @@ -187,7 +187,9 @@ func (bw *blockWriter) mustInitForFilePart(fileSystem fs.FileSystem, path string
bw.writers.fieldValuesWriter.init(fs.MustCreateFile(fileSystem, filepath.Join(path, fieldValuesFilename), filePermission))
}

func (bw *blockWriter) MustWriteDataPoints(sid common.SeriesID, timestamps, versions []int64, tagFamilies [][]nameValues, fields []nameValues, types []pbv1.DataPointValueType) {
func (bw *blockWriter) MustWriteDataPoints(sid common.SeriesID, timestamps, versions []int64, tagFamilies [][]nameValues,
fields []nameValues, types []pbv1.DataPointValueType,
) {
if len(timestamps) == 0 {
return
}
Expand Down
2 changes: 1 addition & 1 deletion banyand/measure/part.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package measure

import (
"fmt"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"path"
"path/filepath"
"sort"
Expand All @@ -29,6 +28,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/pool"
)

Expand Down
5 changes: 4 additions & 1 deletion pkg/pb/v1/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,28 @@ package v1
import (
"bytes"
"fmt"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"

"github.com/pkg/errors"

databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
)

// DataPointValueType is the type of the delta or cumulative value.
type DataPointValueType byte

// DataPointValueTypeUnspecified is undefined type.
const (
DataPointValueTypeUnspecified DataPointValueType = iota
DataPointValueTypeCumulative
DataPointValueTypeDelta
)

// ConvertDataPointValueType converts measurev1.DataPointValue_Type to DataPointValueType.
func ConvertDataPointValueType(protoType measurev1.DataPointValue_Type) DataPointValueType {
switch protoType {
case measurev1.DataPointValue_TYPE_UNSPECIFIED:
Expand Down
50 changes: 19 additions & 31 deletions pkg/query/logical/measure/measure_plan_indexscan_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ package measure
import (
"context"
"fmt"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/measure/aggregate"
"sort"
"time"

"google.golang.org/protobuf/types/known/timestamppb"

commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure/aggregate"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
Expand Down Expand Up @@ -322,21 +322,7 @@ func (i *localIndexScan) startSpan(ctx context.Context, tracer *query.Tracer, or
}
}

func createAggregateFunctions(configs []AggregatorConfig) ([]interface{}, error) {
functions := make([]interface{}, 0, len(configs))

for _, config := range configs {
function, err := createSingleAggregateFunction(config)
if err != nil {
return nil, fmt.Errorf("error creating aggregate function: %w", err)
}
functions = append(functions, function)
}

return functions, nil
}

// create aggregate function
// create aggregate function.
func createSingleAggregateFunction(config AggregatorConfig) (interface{}, error) {
switch config.InputField.Type {
case databasev1.FieldType_FIELD_TYPE_INT:
Expand All @@ -349,7 +335,7 @@ func createSingleAggregateFunction(config AggregatorConfig) (interface{}, error)
}
}

// create int64 aggregate function
// create int64 aggregate function.
func createInt64AggregateFunction(config AggregatorConfig) (interface{}, error) {
switch config.Function {
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN:
Expand All @@ -362,7 +348,7 @@ func createInt64AggregateFunction(config AggregatorConfig) (interface{}, error)
}
}

// create float 64 aggregate function
// create float 64 aggregate function.
func createDoubleAggregateFunction(config AggregatorConfig) (interface{}, error) {
switch config.Function {
case modelv1.MeasureAggregate_MEASURE_AGGREGATE_MIN:
Expand All @@ -375,7 +361,7 @@ func createDoubleAggregateFunction(config AggregatorConfig) (interface{}, error)
}
}

// convert field value to can be aggregated value
// convert field value to can be aggregated value.
func convertToAggregateValue(fieldValue *modelv1.FieldValue) (interface{}, error) {
switch v := fieldValue.GetValue().(type) {
case *modelv1.FieldValue_Int:
Expand Down Expand Up @@ -443,25 +429,26 @@ func buildAggregatorConfigs(fieldAggregation []*databasev1.AggregateField, field
return configs, nil
}

// AggregatorConfig use to config the aggregator
// AggregatorConfig use to config the aggregator.
type AggregatorConfig struct {
InputField FieldInfo
ExtraFields FieldInfo //Used to store additional required fields, such as the count field for AVG
ExtraFields FieldInfo // Used to store additional required fields, such as the count field for AVG.
OutputField FieldInfo

Function modelv1.MeasureAggregate // This Should Store TheSpecificFunctionName
}

// FieldInfo use to store the field information.
type FieldInfo struct {
Name string
Type databasev1.FieldType
}

type deltaResultMIterator struct {
result model.MeasureQueryResult
current []*measurev1.DataPoint
i int
interval string
current []*measurev1.DataPoint
result model.MeasureQueryResult
aggregatorConfig []AggregatorConfig
i int
}

func (di *deltaResultMIterator) Next() bool {
Expand Down Expand Up @@ -571,23 +558,22 @@ func aggregateDataPoints(dps []*measurev1.DataPoint, configs []AggregatorConfig)
return nil, fmt.Errorf("no point")
}

//Use the latest datapoint
// Use the latest datapoint.
result := dps[len(dps)-1]

for _, config := range configs {
aggregateFunc, err := createSingleAggregateFunction(config)
if err != nil {
return nil, fmt.Errorf("fail create: %w", err)
}
//inputValue, err := getFieldValue(result, config.InputField.Name)
if err != nil {
return nil, fmt.Errorf("fail get inputvalue: %w", err)
}

values := make([]interface{}, 0, len(dps))
for _, dp := range dps {
value, err := getFieldValue(dp, config.InputField.Name)
if err != nil {
value, valueErr := getFieldValue(dp, config.InputField.Name)
if valueErr != nil {
continue
}

Expand Down Expand Up @@ -648,7 +634,6 @@ func aggregateDataPoints(dps []*measurev1.DataPoint, configs []AggregatorConfig)
}

if aggregatedValue != nil {

err = addFieldToDataPoint(result, config.OutputField.Name, aggregatedValue)
if err != nil {
return nil, fmt.Errorf("failedToAddAggregateResultField: %w", err)
Expand All @@ -665,6 +650,7 @@ func aggregateDataPoints(dps []*measurev1.DataPoint, configs []AggregatorConfig)

return result, nil
}

func addFieldToDataPoint(dp *measurev1.DataPoint, name string, value interface{}) error {
var fieldValue *modelv1.FieldValue
switch v := value.(type) {
Expand Down Expand Up @@ -692,6 +678,7 @@ func getFieldValue(dp *measurev1.DataPoint, name string) (interface{}, error) {
return nil, fmt.Errorf("fieldNotFound: %s", name)
}

// CreateArguments creates the arguments for the aggregation function.
func CreateArguments[A, B aggregate.Input](
fieldType databasev1.FieldType,
aggregateType modelv1.MeasureAggregate,
Expand Down Expand Up @@ -732,6 +719,7 @@ func (di *deltaResultMIterator) Current() []*measurev1.DataPoint {
}
return nil
}

func (di *deltaResultMIterator) Close() error {
if di.result != nil {
di.result.Release()
Expand Down
18 changes: 10 additions & 8 deletions pkg/query/logical/measure/measure_plan_indexscan_local_test.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
package measure

import (
"math"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"

databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/query/model"
"github.com/google/go-cmp/cmp"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/timestamppb"
"math"
"testing"
"time"
)

func TestGroupAndAggregate(t *testing.T) {
testCases := []struct {
name string
configs []AggregatorConfig
interval time.Duration
input *model.MeasureResult
want []*measurev1.DataPoint
configs []AggregatorConfig
interval time.Duration
}{
{
name: "simple aggregation",
Expand Down

0 comments on commit 3335bb9

Please sign in to comment.