Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[OSPP ] query and write delta measure field #540

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions api/proto/banyandb/database/v1/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,20 @@ message FieldSpec {
// aggregate_function indicates how to aggregate data
model.v1.MeasureAggregate aggregate_function = 5;
}
message AggregateField {
// aggregate_function indicates how to aggregate data
model.v1.MeasureAggregate aggregate_function = 1 [(validate.rules).enum.defined_only = true];

// input_field1 is the required input field
string input_field1 = 2 [(validate.rules).string.min_len = 1];

// input_field2 is an optional input field, only needed for certain functions
string input_field2 = 3;

// output_field is the required field where the aggregation result will be stored
string output_field = 4 [(validate.rules).string.min_len = 1];
}


// Measure intends to store data point
message Measure {
Expand All @@ -115,6 +129,7 @@ message Measure {
string interval = 5;
// updated_at indicates when the measure is updated
google.protobuf.Timestamp updated_at = 6;
repeated AggregateField aggregate_field = 7 [(validate.rules).repeated.min_items = 1];
}

message MeasureAggregateFunction {
Expand Down
50 changes: 42 additions & 8 deletions banyand/measure/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import (
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)

// PREFIX is used to identify delta fields.
const PREFIX = "_"

type block struct {
timestamps []int64

Expand All @@ -56,7 +59,7 @@ func (b *block) reset() {
b.field.reset()
}

func (b *block) mustInitFromDataPoints(timestamps []int64, versions []int64, tagFamilies [][]nameValues, fields []nameValues) {
func (b *block) mustInitFromDataPoints(timestamps []int64, versions []int64, tagFamilies [][]nameValues, fields []nameValues, types []pbv1.DataPointValueType) {
b.reset()
size := len(timestamps)
if size == 0 {
Expand All @@ -72,7 +75,7 @@ func (b *block) mustInitFromDataPoints(timestamps []int64, versions []int64, tag
assertTimestampsSorted(timestamps)
b.timestamps = append(b.timestamps, timestamps...)
b.versions = append(b.versions, versions...)
b.mustInitFromTagsAndFields(tagFamilies, fields)
b.mustInitFromTagsAndFields(tagFamilies, fields, types)
}

func assertTimestampsSorted(timestamps []int64) {
Expand All @@ -84,23 +87,54 @@ func assertTimestampsSorted(timestamps []int64) {
}
}

func (b *block) mustInitFromTagsAndFields(tagFamilies [][]nameValues, fields []nameValues) {
func (b *block) mustInitFromTagsAndFields(tagFamilies [][]nameValues, fields []nameValues, types []pbv1.DataPointValueType) {
dataPointsLen := len(tagFamilies)
if dataPointsLen == 0 {
return
}
for i, tff := range tagFamilies {
b.processTagFamilies(tff, i, dataPointsLen)
}
multiplier := 1
containsDelta, allDelta := containsDelta(types)
if containsDelta {
multiplier = 2
}

for i, f := range fields {
columns := b.field.resizeColumns(len(f.values))
columns := b.field.resizeColumns(len(f.values) * multiplier)
for j, t := range f.values {
columns[j].name = t.name
columns[j].resizeValues(dataPointsLen)
columns[j].valueType = t.valueType
columns[j].values[i] = t.marshal()
columnIndex := j
if types[i] == pbv1.DataPointValueTypeDelta && !allDelta {
columnIndex = len(f.values) + j
}
columns[columnIndex].resizeValues(dataPointsLen)
columns[columnIndex].valueType = t.valueType
columns[columnIndex].values[i] = t.marshal()

switch types[i] {
case pbv1.DataPointValueTypeDelta:
columns[columnIndex].name = PREFIX + t.name
columns[columnIndex].datapointType = pbv1.DataPointValueTypeDelta
case pbv1.DataPointValueTypeCumulative:
columns[columnIndex].datapointType = pbv1.DataPointValueTypeCumulative
columns[columnIndex].name = t.name
default:
columns[columnIndex].datapointType = pbv1.DataPointValueTypeUnspecified
columns[columnIndex].name = t.name
}
}
}
}

func containsDelta(types []pbv1.DataPointValueType) (hasDelta bool, allDelta bool) {
deltaCount := 0
for _, t := range types {
if t == pbv1.DataPointValueTypeDelta {
deltaCount++
}
}
return deltaCount > 0, deltaCount == len(types)
}

func (b *block) processTagFamilies(tff []nameValues, i int, dataPointsLen int) {
Expand Down
4 changes: 3 additions & 1 deletion banyand/measure/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func Test_block_mustInitFromDataPoints(t *testing.T) {
versions []int64
tagFamilies [][]nameValues
fields []nameValues
types []pbv1.DataPointValueType
}
tests := []struct {
name string
Expand All @@ -149,6 +150,7 @@ func Test_block_mustInitFromDataPoints(t *testing.T) {
args: args{
timestamps: []int64{1, 2},
versions: []int64{1, 1},
types: []pbv1.DataPointValueType{pbv1.DataPointValueTypeUnspecified, pbv1.DataPointValueTypeUnspecified},
tagFamilies: [][]nameValues{
{
{
Expand Down Expand Up @@ -214,7 +216,7 @@ func Test_block_mustInitFromDataPoints(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
b := &block{}
b.mustInitFromDataPoints(tt.args.timestamps, tt.args.versions, tt.args.tagFamilies, tt.args.fields)
b.mustInitFromDataPoints(tt.args.timestamps, tt.args.versions, tt.args.tagFamilies, tt.args.fields, tt.args.types)
if !reflect.DeepEqual(*b, tt.want) {
t.Errorf("block.mustInitFromDataPoints() = %+v, want %+v", *b, tt.want)
}
Expand Down
7 changes: 5 additions & 2 deletions banyand/measure/block_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/pool"
)

Expand Down Expand Up @@ -186,14 +187,16 @@ func (bw *blockWriter) mustInitForFilePart(fileSystem fs.FileSystem, path string
bw.writers.fieldValuesWriter.init(fs.MustCreateFile(fileSystem, filepath.Join(path, fieldValuesFilename), filePermission))
}

func (bw *blockWriter) MustWriteDataPoints(sid common.SeriesID, timestamps, versions []int64, tagFamilies [][]nameValues, fields []nameValues) {
func (bw *blockWriter) MustWriteDataPoints(sid common.SeriesID, timestamps, versions []int64, tagFamilies [][]nameValues,
fields []nameValues, types []pbv1.DataPointValueType,
) {
if len(timestamps) == 0 {
return
}

b := generateBlock()
defer releaseBlock(b)
b.mustInitFromDataPoints(timestamps, versions, tagFamilies, fields)
b.mustInitFromDataPoints(timestamps, versions, tagFamilies, fields, types)
bw.mustWriteBlock(sid, b)
}

Expand Down
8 changes: 5 additions & 3 deletions banyand/measure/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
)

type column struct {
name string
values [][]byte
valueType pbv1.ValueType
name string
values [][]byte
valueType pbv1.ValueType
datapointType pbv1.DataPointValueType
}

func (c *column) reset() {
Expand Down Expand Up @@ -56,6 +57,7 @@ func (c *column) mustWriteTo(cm *columnMetadata, columnWriter *writer) {

cm.name = c.name
cm.valueType = c.valueType
cm.datapointType = c.datapointType

bb := bigValuePool.Generate()
defer bigValuePool.Release(bb)
Expand Down
3 changes: 2 additions & 1 deletion banyand/measure/column_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ import (
type columnMetadata struct {
name string
dataBlock
valueType pbv1.ValueType
valueType pbv1.ValueType
datapointType pbv1.DataPointValueType
}

func (cm *columnMetadata) reset() {
Expand Down
1 change: 1 addition & 0 deletions banyand/measure/datapoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type dataPoints struct {
versions []int64
tagFamilies [][]nameValues
fields []nameValues
types []pbv1.DataPointValueType
}

func (d *dataPoints) skip(i int) {
Expand Down
12 changes: 10 additions & 2 deletions banyand/measure/part.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/pool"
)

Expand Down Expand Up @@ -150,6 +151,12 @@ func (mp *memPart) mustInitFromDataPoints(dps *dataPoints) {
uncompressedBlockSizeBytes := uint64(0)
var indexPrev int
var tsPrev int64
if len(dps.types) == 0 {
dps.types = make([]pbv1.DataPointValueType, len(dps.timestamps))
for i := range dps.types {
dps.types[i] = pbv1.DataPointValueTypeUnspecified
}
}
for i := 0; i < len(dps.timestamps); i++ {
sid := dps.seriesIDs[i]
if sidPrev == 0 {
Expand All @@ -167,15 +174,15 @@ func (mp *memPart) mustInitFromDataPoints(dps *dataPoints) {

if uncompressedBlockSizeBytes >= maxUncompressedBlockSize ||
(i-indexPrev) > maxBlockLength || sid != sidPrev {
bsw.MustWriteDataPoints(sidPrev, dps.timestamps[indexPrev:i], dps.versions[indexPrev:i], dps.tagFamilies[indexPrev:i], dps.fields[indexPrev:i])
bsw.MustWriteDataPoints(sidPrev, dps.timestamps[indexPrev:i], dps.versions[indexPrev:i], dps.tagFamilies[indexPrev:i], dps.fields[indexPrev:i], dps.types[indexPrev:i])
sidPrev = sid
indexPrev = i
tsPrev = dps.timestamps[indexPrev]
uncompressedBlockSizeBytes = 0
}
uncompressedBlockSizeBytes += uncompressedDataPointSizeBytes(i, dps)
}
bsw.MustWriteDataPoints(sidPrev, dps.timestamps[indexPrev:], dps.versions[indexPrev:], dps.tagFamilies[indexPrev:], dps.fields[indexPrev:])
bsw.MustWriteDataPoints(sidPrev, dps.timestamps[indexPrev:], dps.versions[indexPrev:], dps.tagFamilies[indexPrev:], dps.fields[indexPrev:], dps.types[indexPrev:])
bsw.Flush(&mp.partMetadata)
releaseBlockWriter(bsw)
}
Expand All @@ -202,6 +209,7 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, path string) {
func uncompressedDataPointSizeBytes(index int, dps *dataPoints) uint64 {
// 8 bytes for timestamp
// 8 bytes for version
// 1 byte for type
n := uint64(8 + 8)
n += uint64(len(dps.fields[index].name))
for i := range dps.fields[index].values {
Expand Down
1 change: 1 addition & 0 deletions banyand/measure/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *me
}
dpt.dataPoints.timestamps = append(dpt.dataPoints.timestamps, ts)
dpt.dataPoints.versions = append(dpt.dataPoints.versions, req.DataPoint.Version)
dpt.dataPoints.types = append(dpt.dataPoints.types, pbv1.ConvertDataPointValueType(req.DataPoint.Type))
stm, ok := w.schemaRepo.loadMeasure(req.GetMetadata())
if !ok {
return nil, fmt.Errorf("cannot find measure definition: %s", req.GetMetadata())
Expand Down
20 changes: 20 additions & 0 deletions docs/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
- [Sort](#banyandb-model-v1-Sort)

- [banyandb/database/v1/schema.proto](#banyandb_database_v1_schema-proto)
- [AggregateField](#banyandb-database-v1-AggregateField)
- [Entity](#banyandb-database-v1-Entity)
- [FieldSpec](#banyandb-database-v1-FieldSpec)
- [IndexRule](#banyandb-database-v1-IndexRule)
Expand Down Expand Up @@ -985,6 +986,24 @@ Each item in a string array is seen as a token instead of a query expression.



<a name="banyandb-database-v1-AggregateField"></a>

### AggregateField



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| aggregate_function | [banyandb.model.v1.MeasureAggregate](#banyandb-model-v1-MeasureAggregate) | | aggregate_function indicates how to aggregate data |
| input_field1 | [string](#string) | | input_field1 is the required input field |
| input_field2 | [string](#string) | | input_field2 is an optional input field, only needed for certain functions |
| output_field | [string](#string) | | output_field is the required field where the aggregation result will be stored |






<a name="banyandb-database-v1-Entity"></a>

### Entity
Expand Down Expand Up @@ -1076,6 +1095,7 @@ Measure intends to store data point
| entity | [Entity](#banyandb-database-v1-Entity) | | entity indicates which tags will be to generate a series and shard a measure |
| interval | [string](#string) | | interval indicates how frequently to send a data point valid time units are &#34;ns&#34;, &#34;us&#34; (or &#34;µs&#34;), &#34;ms&#34;, &#34;s&#34;, &#34;m&#34;, &#34;h&#34;, &#34;d&#34;. |
| updated_at | [google.protobuf.Timestamp](#google-protobuf-Timestamp) | | updated_at indicates when the measure is updated |
| aggregate_field | [AggregateField](#banyandb-database-v1-AggregateField) | repeated | |



Expand Down
25 changes: 25 additions & 0 deletions pkg/pb/v1/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,37 @@ import (
"github.com/pkg/errors"

databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
)

// DataPointValueType is the type of the delta or cumulative value.
type DataPointValueType byte

// DataPointValueTypeUnspecified is undefined type.
const (
DataPointValueTypeUnspecified DataPointValueType = iota
DataPointValueTypeCumulative
DataPointValueTypeDelta
)

// ConvertDataPointValueType converts measurev1.DataPointValue_Type to DataPointValueType.
func ConvertDataPointValueType(protoType measurev1.DataPointValue_Type) DataPointValueType {
switch protoType {
case measurev1.DataPointValue_TYPE_UNSPECIFIED:
return DataPointValueTypeUnspecified
case measurev1.DataPointValue_TYPE_CUMULATIVE:
return DataPointValueTypeCumulative
case measurev1.DataPointValue_TYPE_DELTA:
return DataPointValueTypeDelta
default:
panic("unknown field type")
}
}

// ValueType is the type of the tag and field value.
type ValueType byte

Expand Down
Loading
Loading