diff --git a/go.mod b/go.mod index 9337bb14..4712b9b0 100644 --- a/go.mod +++ b/go.mod @@ -56,6 +56,7 @@ require ( github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.18.2 + github.com/stretchr/testify v1.9.0 github.com/xdg/scram v1.0.5 go.starlark.net v0.0.0-20231121155337-90ade8b19d09 golang.org/x/crypto v0.26.0 @@ -144,6 +145,7 @@ require ( github.com/pelletier/go-toml/v2 v2.1.0 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pjbgf/sha1cd v0.3.0 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rivo/uniseg v0.4.4 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect diff --git a/pkg/formatters/all/all.go b/pkg/formatters/all/all.go index 54a66fdb..f95ff4ce 100644 --- a/pkg/formatters/all/all.go +++ b/pkg/formatters/all/all.go @@ -30,5 +30,6 @@ import ( _ "github.com/openconfig/gnmic/pkg/formatters/event_to_tag" _ "github.com/openconfig/gnmic/pkg/formatters/event_trigger" _ "github.com/openconfig/gnmic/pkg/formatters/event_value_tag" + _ "github.com/openconfig/gnmic/pkg/formatters/event_value_tag_v2" _ "github.com/openconfig/gnmic/pkg/formatters/event_write" ) diff --git a/pkg/formatters/event_value_tag/event_value_tag.go b/pkg/formatters/event_value_tag/event_value_tag.go index e5acab6d..dc4af65b 100644 --- a/pkg/formatters/event_value_tag/event_value_tag.go +++ b/pkg/formatters/event_value_tag/event_value_tag.go @@ -44,6 +44,9 @@ func (vt *valueTag) Init(cfg interface{}, opts ...formatters.Option) error { if err != nil { return err } + if vt.TagName == "" { + vt.TagName = vt.ValueName + } for _, opt := range opts { opt(vt) } @@ -65,17 +68,17 @@ type tagVal struct { } func (vt *valueTag) Apply(evs ...*formatters.EventMsg) []*formatters.EventMsg { - if vt.TagName == "" { - vt.TagName = vt.ValueName - } - - cache := make(map[string]bool) vts := vt.buildApplyRules(evs) for _, tv := range vts { for _, ev := range evs { - match := compareTags(tv.tags, ev.Tags, cache) + match := compareTags(tv.tags, ev.Tags) if match { - ev.Tags[vt.TagName] = fmt.Sprint(tv.value) + switch v := tv.value.(type) { + case string: + ev.Tags[vt.TagName] = v + default: + ev.Tags[vt.TagName] = fmt.Sprint(tv.value) + } } } } @@ -95,24 +98,15 @@ func (vt *valueTag) WithTargets(tcs map[string]*types.TargetConfig) {} func (vt *valueTag) WithActions(act map[string]map[string]interface{}) {} // returns true if all keys match, false otherwise. -func compareTags(a map[string]string, b map[string]string, cache map[string]bool) bool { - cacheKey := fmt.Sprintf("%v-%v", a, b) - if cachedResult, exists := cache[cacheKey]; exists { - return cachedResult - } - +func compareTags(a map[string]string, b map[string]string) bool { if len(a) > len(b) { - cache[cacheKey] = false return false } for k, v := range a { if vv, ok := b[k]; !ok || v != vv { - cache[cacheKey] = false return false } } - - cache[cacheKey] = true return true } @@ -122,7 +116,11 @@ func (vt *valueTag) buildApplyRules(evs []*formatters.EventMsg) []*tagVal { toApply := make([]*tagVal, 0) for _, ev := range evs { if v, ok := ev.Values[vt.ValueName]; ok { - toApply = append(toApply, &tagVal{tags: ev.Tags, value: v}) + toApply = append(toApply, + &tagVal{ + tags: copyTags(ev.Tags), + value: v, + }) if vt.Consume { delete(ev.Values, vt.ValueName) } @@ -130,3 +128,11 @@ func (vt *valueTag) buildApplyRules(evs []*formatters.EventMsg) []*tagVal { } return toApply } + +func copyTags(src map[string]string) map[string]string { + dest := make(map[string]string, len(src)) + for k, v := range src { + dest[k] = v + } + return dest +} diff --git a/pkg/formatters/event_value_tag/event_value_tag_test.go b/pkg/formatters/event_value_tag/event_value_tag_test.go index 84f2d89b..b30f9c0f 100644 --- a/pkg/formatters/event_value_tag/event_value_tag_test.go +++ b/pkg/formatters/event_value_tag/event_value_tag_test.go @@ -10,6 +10,7 @@ package event_value_tag import ( "fmt" + "log" "reflect" "testing" @@ -30,6 +31,7 @@ var testset = map[string]struct { processorType: processorType, processor: map[string]interface{}{ "value-name": "foo", + "debug": true, }, tests: []item{ { @@ -51,6 +53,16 @@ var testset = map[string]struct { Tags: map[string]string{"tag": "value"}, Values: map[string]interface{}{"foo": "new_value"}, }, + { + Timestamp: 3, + Tags: map[string]string{"other_tag": "value"}, + Values: map[string]interface{}{"other_val": "val"}, + }, + { + Timestamp: 4, + Tags: map[string]string{"foo": "other_value"}, + Values: map[string]interface{}{"other_val": "val"}, + }, }, output: []*formatters.EventMsg{ { @@ -62,6 +74,16 @@ var testset = map[string]struct { Tags: map[string]string{"tag": "value", "foo": "new_value"}, Values: map[string]interface{}{"foo": "new_value"}, }, + { + Timestamp: 3, + Tags: map[string]string{"other_tag": "value"}, + Values: map[string]interface{}{"other_val": "val"}, + }, + { + Timestamp: 4, + Tags: map[string]string{"foo": "other_value"}, + Values: map[string]interface{}{"other_val": "val"}, + }, }, }, { @@ -272,6 +294,94 @@ var testset = map[string]struct { }, }, }, + "integer_val": { + processorType: processorType, + processor: map[string]interface{}{ + "value-name": "foo", + }, + tests: []item{ + { + input: nil, + output: nil, + }, + { + input: make([]*formatters.EventMsg, 0), + output: make([]*formatters.EventMsg, 0), + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"foo": 42}, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value", "foo": "42"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value", "foo": "42"}, + Values: map[string]interface{}{"foo": 42}, + }, + }, + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"bar": "value"}, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"bar": "value"}, + }, + }, + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value1"}, + Values: map[string]interface{}{"foo": "value"}, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value1", "foo": "value"}, + Values: map[string]interface{}{"foo": "value"}, + }, + }, + }, + }, + }, } func TestEventValueTag(t *testing.T) { @@ -279,7 +389,7 @@ func TestEventValueTag(t *testing.T) { if pi, ok := formatters.EventProcessors[ts.processorType]; ok { t.Log("found processor") p := pi() - err := p.Init(ts.processor) + err := p.Init(ts.processor, formatters.WithLogger(log.Default())) if err != nil { t.Errorf("failed to initialize processors: %v", err) return @@ -320,7 +430,7 @@ func generateEventMsgs(numEvents, numValues int, targetKey, targetValue string) } func BenchmarkBuildApplyRules(b *testing.B) { - evs := generateEventMsgs(100, 10, "targetKey", "targetValue") + evs := generateEventMsgs(100_000, 10, "targetKey", "targetValue") vt := &valueTag{ValueName: "targetKey", Consume: true} b.ResetTimer() @@ -330,7 +440,7 @@ func BenchmarkBuildApplyRules(b *testing.B) { } func BenchmarkBuildApplyRules2(b *testing.B) { - evs := generateEventMsgs(100, 10, "targetKey", "targetValue") + evs := generateEventMsgs(100_000, 10, "targetKey", "targetValue") vt := &valueTag{ValueName: "targetKey", Consume: true} b.ResetTimer() @@ -346,7 +456,11 @@ func (vt *valueTag) buildApplyRules2(evs []*formatters.EventMsg) []*tagVal { for _, ev := range evs { for k, v := range ev.Values { if vt.ValueName == k { - toApply = append(toApply, &tagVal{ev.Tags, v}) + toApply = append(toApply, &tagVal{ + // copyTags(ev.Tags), + ev.Tags, + v, + }) if vt.Consume { delete(ev.Values, vt.ValueName) } diff --git a/pkg/formatters/event_value_tag_v2/event_value_tag_v2.go b/pkg/formatters/event_value_tag_v2/event_value_tag_v2.go new file mode 100644 index 00000000..06f9505a --- /dev/null +++ b/pkg/formatters/event_value_tag_v2/event_value_tag_v2.go @@ -0,0 +1,174 @@ +// © 2025 Nokia. +// +// This code is a Contribution to the gNMIc project (“Work”) made under the Google Software Grant and Corporate Contributor License Agreement (“CLA”) and governed by the Apache License 2.0. +// No other rights or licenses in or to any of Nokia’s intellectual property are granted for any other purpose. +// This code is provided on an “as is” basis without any warranties of any kind. +// +// SPDX-License-Identifier: Apache-2.0 + +package event_value_tag_v2 + +import ( + "encoding/json" + "fmt" + "hash/fnv" + "io" + "log" + "os" + "slices" + + "github.com/openconfig/gnmic/pkg/api/types" + "github.com/openconfig/gnmic/pkg/api/utils" + "github.com/openconfig/gnmic/pkg/formatters" +) + +const ( + processorType = "event-value-tag-v2" + loggingPrefix = "[" + processorType + "] " +) + +var ( + eqByte = []byte("=") + semiC = []byte(";") + pipeByte = []byte("|") +) + +type valueTag struct { + TagName string `mapstructure:"tag-name,omitempty" json:"tag-name,omitempty"` + ValueName string `mapstructure:"value-name,omitempty" json:"value-name,omitempty"` + Consume bool `mapstructure:"consume,omitempty" json:"consume,omitempty"` + Debug bool `mapstructure:"debug,omitempty" json:"debug,omitempty"` + logger *log.Logger + + applyRules map[uint64]*applyRule +} + +func init() { + formatters.Register(processorType, func() formatters.EventProcessor { + return &valueTag{logger: log.New(io.Discard, "", 0)} + }) +} + +func (vt *valueTag) Init(cfg interface{}, opts ...formatters.Option) error { + err := formatters.DecodeConfig(cfg, vt) + if err != nil { + return err + } + for _, opt := range opts { + opt(vt) + } + + if vt.TagName == "" { + vt.TagName = vt.ValueName + } + + vt.applyRules = make(map[uint64]*applyRule) + + if vt.logger.Writer() != io.Discard { + b, err := json.Marshal(vt) + if err != nil { + vt.logger.Printf("initialized processor '%s': %+v", processorType, vt) + return nil + } + vt.logger.Printf("initialized processor '%s': %s", processorType, string(b)) + } + return nil +} + +type applyRule struct { + // Set of tags that must be present in a message + // in order to add the value as tag. + tags map[string]string + // The value to be added as tag. + // The tag name is taken from the main proc struct. + value any +} + +func (vt *valueTag) Apply(evs ...*formatters.EventMsg) []*formatters.EventMsg { + vt.updateApplyRules(evs) + for _, ar := range vt.applyRules { + for _, ev := range evs { + if includedIn(ar.tags, ev.Tags) { + switch v := ar.value.(type) { + case string: + ev.Tags[vt.TagName] = v + default: + ev.Tags[vt.TagName] = fmt.Sprint(ar.value) + } + } + } + } + return evs +} + +func (vt *valueTag) WithLogger(l *log.Logger) { + if vt.Debug && l != nil { + vt.logger = log.New(l.Writer(), loggingPrefix, l.Flags()) + } else if vt.Debug { + vt.logger = log.New(os.Stderr, loggingPrefix, utils.DefaultLoggingFlags) + } +} + +func (vt *valueTag) WithTargets(tcs map[string]*types.TargetConfig) {} + +func (vt *valueTag) WithActions(act map[string]map[string]interface{}) {} + +// comparison logic for maps +// i.e: a ⊆ b +func includedIn(a, b map[string]string) bool { + if len(a) > len(b) { + return false + } + for k, v := range a { + if bv, ok := b[k]; !ok || v != bv { + return false + } + } + return true +} + +func (vt *valueTag) WithProcessors(procs map[string]map[string]any) {} + +func (vt *valueTag) updateApplyRules(evs []*formatters.EventMsg) { + for _, ev := range evs { + if v, ok := ev.Values[vt.ValueName]; ok { + // calculate apply rule Key + k := vt.applyRuleKey(ev.Tags) + vt.applyRules[k] = &applyRule{ + tags: copyTags(ev.Tags), // copy map + value: v, + } + if vt.Consume { + delete(ev.Values, vt.ValueName) + } + } + } +} + +// the apply rule key is a hash of the valueName and the event msg tags +func (vt *valueTag) applyRuleKey(m map[string]string) uint64 { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + slices.Sort(keys) + + h := fnv.New64a() + h.Write([]byte(vt.ValueName)) + h.Write(pipeByte) + for _, k := range keys { + h.Write([]byte(k)) + h.Write(eqByte) + h.Write([]byte(m[k])) + h.Write(semiC) + } + return h.Sum64() +} + +func copyTags(src map[string]string) map[string]string { + dest := make(map[string]string, len(src)) + for k, v := range src { + dest[k] = v + } + return dest +} diff --git a/pkg/formatters/event_value_tag_v2/event_value_tag_v2_test.go b/pkg/formatters/event_value_tag_v2/event_value_tag_v2_test.go new file mode 100644 index 00000000..765262e9 --- /dev/null +++ b/pkg/formatters/event_value_tag_v2/event_value_tag_v2_test.go @@ -0,0 +1,372 @@ +// © 2025 Nokia. +// +// This code is a Contribution to the gNMIc project (“Work”) made under the Google Software Grant and Corporate Contributor License Agreement (“CLA”) and governed by the Apache License 2.0. +// No other rights or licenses in or to any of Nokia’s intellectual property are granted for any other purpose. +// This code is provided on an “as is” basis without any warranties of any kind. +// +// SPDX-License-Identifier: Apache-2.0 + +package event_value_tag_v2 + +import ( + "io" + "log" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/openconfig/gnmic/pkg/formatters" +) + +type item struct { + input []*formatters.EventMsg + output []*formatters.EventMsg +} + +var testset = map[string]struct { + processorType string + processor map[string]interface{} + tests []item +}{ + "no-options": { + processorType: processorType, + processor: map[string]interface{}{ + "value-name": "foo", + }, + tests: []item{ + { + input: nil, + output: nil, + }, + { + input: make([]*formatters.EventMsg, 0), + output: make([]*formatters.EventMsg, 0), + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"foo": "new_value"}, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value", "foo": "new_value"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value", "foo": "new_value"}, + Values: map[string]interface{}{"foo": "new_value"}, + }, + }, + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"bar": "value"}, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"bar": "value"}, + }, + }, + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"counter1": "1"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"foo": "value"}, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value", "foo": "value"}, + Values: map[string]interface{}{"counter1": "1"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value", "foo": "value"}, + Values: map[string]interface{}{"foo": "value"}, + }, + }, + }, + }, + }, + "rename-tag": { + processorType: processorType, + processor: map[string]interface{}{ + "value-name": "foo", + "tag-name": "bar", + }, + tests: []item{ + { + input: nil, + output: nil, + }, + { + input: make([]*formatters.EventMsg, 0), + output: make([]*formatters.EventMsg, 0), + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"foo": "new_value"}, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value", "bar": "new_value"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value", "bar": "new_value"}, + Values: map[string]interface{}{"foo": "new_value"}, + }, + }, + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"bar": "value"}, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"bar": "value"}, + }, + }, + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"counter1": "1"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"foo": "value"}, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value", "bar": "value"}, + Values: map[string]interface{}{"counter1": "1"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value", "bar": "value"}, + Values: map[string]interface{}{"foo": "value"}, + }, + }, + }, + }, + }, + "consume-value": { + processorType: processorType, + processor: map[string]interface{}{ + "value-name": "foo", + "consume": true, + }, + tests: []item{ + { + input: nil, + output: nil, + }, + { + input: make([]*formatters.EventMsg, 0), + output: make([]*formatters.EventMsg, 0), + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"foo": "new_value"}, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value", "foo": "new_value"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value", "foo": "new_value"}, + Values: make(map[string]interface{}, 0), + }, + }, + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"bar": "value"}, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 1, + Tags: map[string]string{"tag": "value"}, + }, + { + Timestamp: 2, + Tags: map[string]string{"tag": "value"}, + Values: map[string]interface{}{"bar": "value"}, + }, + }, + }, + }, + }, +} + +func TestEventValueTag(t *testing.T) { + for name, ts := range testset { + if pi, ok := formatters.EventProcessors[ts.processorType]; ok { + t.Log("found processor") + for i, item := range ts.tests { + // a processor per test item + p := pi() + err := p.Init(ts.processor) + if err != nil { + t.Errorf("failed to initialize processors: %v", err) + return + } + t.Logf("processor: %+v", p) + t.Run(name, func(t *testing.T) { + t.Logf("running test item %d", i) + outs := p.Apply(item.input...) + for j := range outs { + if !reflect.DeepEqual(outs[j], item.output[j]) { + t.Errorf("failed at %s item %d, index %d, expected %+v", name, i, j, item.output[j]) + t.Errorf("failed at %s item %d, index %d, got: %+v", name, i, j, outs[j]) + } + } + }) + } + } else { + t.Errorf("event processor %s not found", ts.processorType) + } + } +} + +func TestValueTagApplySubsequentRuns(t *testing.T) { + processor := &valueTag{ + TagName: "moved-tag", + ValueName: "important-value", + Consume: true, + Debug: true, + logger: log.New(io.Discard, "", 0), + applyRules: make(map[uint64]*applyRule), + } + + // first set + events1 := []*formatters.EventMsg{ + { + Tags: map[string]string{"tag1": "value1"}, + Values: map[string]interface{}{ + "important-value": "value-to-move", + }, + }, + { + Tags: map[string]string{"tag2": "value2"}, + Values: map[string]interface{}{ + "other-value": "irrelevant", + }, + }, + } + + // first apply + processed1 := processor.Apply(events1...) + + // assert + assert.Equal(t, "value-to-move", processed1[0].Tags["moved-tag"]) + assert.NotContains(t, processed1[0].Values, "important-value") + assert.NotContains(t, processed1[1].Tags, "moved-tag") + + // second set + events2 := []*formatters.EventMsg{ + { + Tags: map[string]string{ + "tag1": "value1", + }, + Values: map[string]interface{}{ + "new-value": "some-new-data", + }, + }, + { + Tags: map[string]string{ + "tag1": "value1", + }, + Values: map[string]interface{}{ + "counter1": 42, + }, + }, + } + + // second apply + processed2 := processor.Apply(events2...) + // assert + assert.Equal(t, "value-to-move", processed2[0].Tags["moved-tag"]) + assert.Contains(t, processed2[0].Tags, "tag1") + assert.Contains(t, processed2[0].Values, "new-value") + assert.Contains(t, processed2[1].Tags, "tag1") + assert.Contains(t, processed2[1].Values, "counter1") +} diff --git a/pkg/formatters/processors.go b/pkg/formatters/processors.go index be3aaad6..a5364624 100644 --- a/pkg/formatters/processors.go +++ b/pkg/formatters/processors.go @@ -40,6 +40,7 @@ var EventProcessorTypes = []string{ "event-group-by", "event-data-convert", "event-value-tag", + "event-value-tag-v2", "event-starlark", "event-combine", "event-ieeefloat32",