From c02f06195f1e91adc24d352717e1b0963be92b69 Mon Sep 17 00:00:00 2001 From: netixx Date: Thu, 19 Oct 2023 16:24:56 +0000 Subject: [PATCH 1/3] Add rate-limit processor --- .../event_processors/event_rate_limit.md | 28 ++ formatters/all/all.go | 1 + .../event_rate_limit/event_rate_limit.go | 135 +++++++ .../event_rate_limit/event_rate_limit_test.go | 363 ++++++++++++++++++ formatters/processors.go | 1 + go.mod | 3 +- go.sum | 6 +- mkdocs.yml | 1 + 8 files changed, 535 insertions(+), 3 deletions(-) create mode 100644 docs/user_guide/event_processors/event_rate_limit.md create mode 100644 formatters/event_rate_limit/event_rate_limit.go create mode 100644 formatters/event_rate_limit/event_rate_limit_test.go diff --git a/docs/user_guide/event_processors/event_rate_limit.md b/docs/user_guide/event_processors/event_rate_limit.md new file mode 100644 index 00000000..1104e723 --- /dev/null +++ b/docs/user_guide/event_processors/event_rate_limit.md @@ -0,0 +1,28 @@ +The `event-rate-limit` processor rate-limits each event with matching tags to the configured amount per-seconds. + +All the tags for each event is hashed, and if the hash matches a previously seen event, then the timestamp +of the event itself is compared to assess if the configured limit has be exceeded. +If it has, then this new event is dropped from the pipeline. + +The cache for comparing timestamp is an LRU cache, with a default size of 1000 that can be increased for bigger deployments. + +To account for cases where the device will artificially split the event into multiple chunks (with the same timestamp), +the rate-limiter will ignore events with exactly the same timestamp. + + +### Examples + +```yaml +processors: + # processor name + rate-limit-100pps: + # processor type + event-rate-limit: + # rate of filtering, in events per seconds + per-second: 100 + # set the cache size for doing the rate-limiting comparison + # default value is 1000 + cache-size: 10000 + # debug for additionnal logging of dropped events + debug: true +``` \ No newline at end of file diff --git a/formatters/all/all.go b/formatters/all/all.go index 9f6e0df2..5d65c7da 100644 --- a/formatters/all/all.go +++ b/formatters/all/all.go @@ -22,6 +22,7 @@ import ( _ "github.com/openconfig/gnmic/formatters/event_jq" _ "github.com/openconfig/gnmic/formatters/event_merge" _ "github.com/openconfig/gnmic/formatters/event_override_ts" + _ "github.com/openconfig/gnmic/formatters/event_rate_limit" _ "github.com/openconfig/gnmic/formatters/event_starlark" _ "github.com/openconfig/gnmic/formatters/event_strings" _ "github.com/openconfig/gnmic/formatters/event_to_tag" diff --git a/formatters/event_rate_limit/event_rate_limit.go b/formatters/event_rate_limit/event_rate_limit.go new file mode 100644 index 00000000..6bacbfd5 --- /dev/null +++ b/formatters/event_rate_limit/event_rate_limit.go @@ -0,0 +1,135 @@ +package event_rate_limit + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "log" + "os" + "sort" + "time" + + lru "github.com/hashicorp/golang-lru/v2" + "github.com/openconfig/gnmic/formatters" + "github.com/openconfig/gnmic/types" + "github.com/openconfig/gnmic/utils" +) + +const ( + processorType = "event-rate-limit" + loggingPrefix = "[" + processorType + "] " + defaultCacheSize = 1000 + oneSecond int64 = int64(time.Second) +) + +// RateLimit rate-limits the message to the given rate. +type RateLimit struct { + // formatters.EventProcessor + + PerSecondLimit float64 `mapstructure:"per-second,omitempty" json:"per-second,omitempty"` + CacheSize int `mapstructure:"cache-size,omitempty" json:"cache-size,omitempty"` + Debug bool `mapstructure:"debug,omitempty" json:"debug,omitempty"` + + // eventIndex is an lru cache used to compare the events hash with known value. + // LRU cache seems like a good choice because we expect the rate-limiter to be + // most useful in burst scenarios. + // We need some form of control over the size of the cache to contain RAM usage + // so LRU is good in that respect also. + eventIndex *lru.Cache[string, int64] + logger *log.Logger +} + +func init() { + formatters.Register(processorType, func() formatters.EventProcessor { + return &RateLimit{ + logger: log.New(io.Discard, "", 0), + } + }) +} + +func (o *RateLimit) Init(cfg interface{}, opts ...formatters.Option) error { + err := formatters.DecodeConfig(cfg, o) + if err != nil { + return err + } + for _, opt := range opts { + opt(o) + } + if o.CacheSize <= 0 { + o.logger.Printf("using default value for lru size %d", defaultCacheSize) + o.CacheSize = defaultCacheSize + + } + if o.PerSecondLimit <= 0 { + return fmt.Errorf("provided limit is %f, must be greater than 0", o.PerSecondLimit) + } + if o.logger.Writer() != io.Discard { + b, err := json.Marshal(o) + if err != nil { + o.logger.Printf("initialized processor '%s': %+v", processorType, o) + return nil + } + o.logger.Printf("initialized processor '%s': %s", processorType, string(b)) + } + + o.eventIndex, err = lru.New[string, int64](o.CacheSize) + if err != nil { + return fmt.Errorf("failed to initialize cache: %w", err) + } + return nil +} + +func (o *RateLimit) Apply(es ...*formatters.EventMsg) []*formatters.EventMsg { + validEs := []*formatters.EventMsg{} + for _, e := range es { + if e == nil { + continue + } + h := hashEvent(e) + ts, has := o.eventIndex.Get(h) + // we check that we have the event hash in the map, if not, it's the first time we see the event + if val := float64(e.Timestamp - ts) * o.PerSecondLimit; has && e.Timestamp != ts && int64(val) < oneSecond { + // reject event + o.logger.Printf("dropping event val %.2f lower than configured rate", val) + continue + } + // retain the last event that passed through + o.eventIndex.Add(h, e.Timestamp) + validEs = append(validEs, e) + } + return validEs +} + +func hashEvent(e *formatters.EventMsg) string { + h := sha256.New() + tagKeys := make([]string, len(e.Tags)) + i := 0 + for tagKey := range e.Tags { + tagKeys[i] = tagKey + i++ + } + sort.Strings(tagKeys) + + for _, tagKey := range tagKeys { + h.Write([]byte(tagKey)) + h.Write([]byte("=")) + h.Write([]byte(e.Tags[tagKey])) + h.Write([]byte("\n")) + } + + return hex.EncodeToString(h.Sum(nil)) +} + +func (o *RateLimit) WithLogger(l *log.Logger) { + if o.Debug && l != nil { + o.logger = log.New(l.Writer(), loggingPrefix, l.Flags()) + } else if o.Debug { + o.logger = log.New(os.Stderr, loggingPrefix, utils.DefaultLoggingFlags) + } +} + +func (o *RateLimit) WithTargets(tcs map[string]*types.TargetConfig) {} + +func (o *RateLimit) WithActions(act map[string]map[string]interface{}) {} diff --git a/formatters/event_rate_limit/event_rate_limit_test.go b/formatters/event_rate_limit/event_rate_limit_test.go new file mode 100644 index 00000000..714198e8 --- /dev/null +++ b/formatters/event_rate_limit/event_rate_limit_test.go @@ -0,0 +1,363 @@ +package event_rate_limit + +import ( + "testing" + + "github.com/openconfig/gnmic/formatters" +) + +type item struct { + input []*formatters.EventMsg + output []*formatters.EventMsg +} + +var testset = map[string]struct { + processor map[string]interface{} + tests []item +}{ + "1pps-notags-pass": { + processor: map[string]interface{}{ + "type": processorType, + "debug": true, + "per-second": 1.0, + }, + tests: []item{ + { + input: nil, + output: nil, + }, + { + input: []*formatters.EventMsg{}, + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 0, + }, + { + Timestamp: 1e9+1, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 0, + }, + { + Timestamp: 1e9+1, + }, + }, + }, + }, + }, + "1pps-tags-pass": { + processor: map[string]interface{}{ + "type": processorType, + "per-second": 1.0, + "debug": true, + }, + tests: []item{ + { + input: nil, + output: nil, + }, + { + input: []*formatters.EventMsg{}, + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 0, + Tags: map[string]string{ + "a": "val-x", + "b": "val-y", + }, + }, + { + Timestamp: 1, + }, + { + Timestamp: 1+1e9, + }, + { + Timestamp: 1e9+1, + Tags: map[string]string{ + "a": "val-x", + "b": "val-y", + }, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 0, + Tags: map[string]string{ + "a": "val-x", + "b": "val-y", + }, + }, + { + Timestamp: 1, + }, + { + Timestamp: 1+1e9, + }, + { + Timestamp: 1e9+1, + Tags: map[string]string{ + "a": "val-x", + "b": "val-y", + }, + }, + }, + }, + }, + }, + "1pps-notags-drop": { + processor: map[string]interface{}{ + "type": processorType, + "debug": true, + "per-second": 1.0, + }, + tests: []item{ + { + input: nil, + output: nil, + }, + { + input: []*formatters.EventMsg{}, + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 0, + }, + { + Timestamp: 1e9-1, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 0, + }, + }, + }, + }, + }, + "1pps-tags-drop": { + processor: map[string]interface{}{ + "type": processorType, + "per-second": 1.0, + "debug": true, + }, + tests: []item{ + { + input: nil, + output: nil, + }, + { + input: []*formatters.EventMsg{}, + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 0, + Tags: map[string]string{ + "a": "val-x", + "b": "val-y", + }, + }, + { + Timestamp: 1, + }, + { + Timestamp: 1e9-1, + Tags: map[string]string{ + "a": "val-x", + "b": "val-y", + }, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 0, + Tags: map[string]string{ + "a": "val-x", + "b": "val-y", + }, + }, + { + Timestamp: 1, + }, + }, + }, + }, + }, + "100pps-tags-pass": { + processor: map[string]interface{}{ + "type": processorType, + "per-second": 100.0, + "debug": true, + }, + tests: []item{ + { + input: nil, + output: nil, + }, + { + input: []*formatters.EventMsg{}, + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 0, + Tags: map[string]string{ + "a": "val-x", + "b": "val-y", + }, + }, + { + Timestamp: 1, + }, + { + Timestamp: 1e9/100, + Tags: map[string]string{ + "a": "val-x", + "b": "val-y", + }, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 0, + Tags: map[string]string{ + "a": "val-x", + "b": "val-y", + }, + }, + { + Timestamp: 1, + }, + { + Timestamp: 1e9/100, + Tags: map[string]string{ + "a": "val-x", + "b": "val-y", + }, + }, + }, + }, + }, + }, + "100pps-tags-drop": { + processor: map[string]interface{}{ + "type": processorType, + "per-second": 100.0, + "debug": true, + }, + tests: []item{ + { + input: nil, + output: nil, + }, + { + input: []*formatters.EventMsg{}, + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 0, + Tags: map[string]string{ + "a": "val-x", + "b": "val-y", + }, + }, + { + Timestamp: 1, + }, + { + Timestamp: 1e9/100-1, + Tags: map[string]string{ + "a": "val-x", + "b": "val-y", + }, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 0, + Tags: map[string]string{ + "a": "val-x", + "b": "val-y", + }, + }, + { + Timestamp: 1, + }, + }, + }, + }, + }, + "same-ts-pass": { + processor: map[string]interface{}{ + "type": processorType, + "per-second": 100.0, + "debug": true, + }, + tests: []item{ + { + input: nil, + output: nil, + }, + { + input: []*formatters.EventMsg{}, + }, + { + input: []*formatters.EventMsg{ + { + Timestamp: 0, + }, + { + Timestamp: 0, + }, + }, + output: []*formatters.EventMsg{ + { + Timestamp: 0, + }, + { + Timestamp: 0, + }, + }, + }, + }, + }, +} + +func TestRateLimit(t *testing.T) { + for name, ts := range testset { + t.Log(name) + if typ, ok := ts.processor["type"]; ok { + t.Log("found type") + if pi, ok := formatters.EventProcessors[typ.(string)]; ok { + t.Log("found processor") + p := pi() + err := p.Init(ts.processor, formatters.WithLogger(nil)) + if err != nil { + t.Errorf("failed to initialize processors: %v", err) + return + } + t.Logf("initialized for test %s: %+v", name, p) + for i, item := range ts.tests { + t.Run(name, func(t *testing.T) { + t.Logf("running test item %d", i) + outs := p.Apply(item.input...) + if len(outs) != len(item.output) { + t.Logf("failed at event rate_limit, item %d", i) + t.Logf("different number of events between output=%d and wanted=%d", len(outs), len(item.output)) + t.Fail() + } + }) + } + } + } + } +} diff --git a/formatters/processors.go b/formatters/processors.go index 9b0e1d7f..41590eec 100644 --- a/formatters/processors.go +++ b/formatters/processors.go @@ -31,6 +31,7 @@ var EventProcessorTypes = []string{ "event-jq", "event-merge", "event-override-ts", + "event-rate-limit", "event-strings", "event-to-tag", "event-trigger", diff --git a/go.mod b/go.mod index baf00e2a..41eee4f5 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/hairyhenderson/gomplate/v3 v3.11.5 github.com/hashicorp/consul/api v1.25.1 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/huandu/xstrings v1.4.0 github.com/influxdata/influxdb-client-go/v2 v2.12.3 github.com/itchyny/gojq v0.12.13 @@ -195,7 +196,7 @@ require ( github.com/hashicorp/go-rootcerts v1.0.2 // indirect github.com/hashicorp/go-sockaddr v1.0.2 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect - github.com/hashicorp/golang-lru v0.6.0 // indirect + github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/serf v0.10.1 // indirect github.com/hashicorp/vault/api v1.6.0 // indirect diff --git a/go.sum b/go.sum index 255bf172..90342114 100644 --- a/go.sum +++ b/go.sum @@ -643,8 +643,10 @@ github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09 github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= -github.com/hashicorp/golang-lru v0.6.0 h1:uL2shRDx7RTrOrTCUZEGP/wJUFiUI8QT6E7z5o8jga4= -github.com/hashicorp/golang-lru v0.6.0/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= +github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= diff --git a/mkdocs.yml b/mkdocs.yml index 79e28714..54c72224 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -83,6 +83,7 @@ nav: - JQ: user_guide/event_processors/event_jq.md - Merge: user_guide/event_processors/event_merge.md - Override TS: user_guide/event_processors/event_override_ts.md + - Rate Limit: user_guide/event_processors/event_rate_limit.md - Starlark: user_guide/event_processors/event_starlark.md - Strings: user_guide/event_processors/event_strings.md - To Tag: user_guide/event_processors/event_to_tag.md From 2a8ce11817ae2e5e64078d519ea17bce70c21ed0 Mon Sep 17 00:00:00 2001 From: netixx Date: Wed, 25 Oct 2023 07:56:50 +0000 Subject: [PATCH 2/3] Improve slice allocation, use package var for const chars and fix doc typo --- .../event_processors/event_rate_limit.md | 2 +- .../event_rate_limit/event_rate_limit.go | 32 ++++++++++++------- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/docs/user_guide/event_processors/event_rate_limit.md b/docs/user_guide/event_processors/event_rate_limit.md index 1104e723..f3990b4c 100644 --- a/docs/user_guide/event_processors/event_rate_limit.md +++ b/docs/user_guide/event_processors/event_rate_limit.md @@ -1,7 +1,7 @@ The `event-rate-limit` processor rate-limits each event with matching tags to the configured amount per-seconds. All the tags for each event is hashed, and if the hash matches a previously seen event, then the timestamp -of the event itself is compared to assess if the configured limit has be exceeded. +of the event itself is compared to assess if the configured limit has been exceeded. If it has, then this new event is dropped from the pipeline. The cache for comparing timestamp is an LRU cache, with a default size of 1000 that can be increased for bigger deployments. diff --git a/formatters/event_rate_limit/event_rate_limit.go b/formatters/event_rate_limit/event_rate_limit.go index 6bacbfd5..8441593d 100644 --- a/formatters/event_rate_limit/event_rate_limit.go +++ b/formatters/event_rate_limit/event_rate_limit.go @@ -18,10 +18,15 @@ import ( ) const ( - processorType = "event-rate-limit" - loggingPrefix = "[" + processorType + "] " - defaultCacheSize = 1000 - oneSecond int64 = int64(time.Second) + processorType = "event-rate-limit" + loggingPrefix = "[" + processorType + "] " + defaultCacheSize = 1000 + oneSecond int64 = int64(time.Second) +) + +var ( + eqChar = []byte("=") + lfChar = []byte("\n") ) // RateLimit rate-limits the message to the given rate. @@ -29,7 +34,7 @@ type RateLimit struct { // formatters.EventProcessor PerSecondLimit float64 `mapstructure:"per-second,omitempty" json:"per-second,omitempty"` - CacheSize int `mapstructure:"cache-size,omitempty" json:"cache-size,omitempty"` + CacheSize int `mapstructure:"cache-size,omitempty" json:"cache-size,omitempty"` Debug bool `mapstructure:"debug,omitempty" json:"debug,omitempty"` // eventIndex is an lru cache used to compare the events hash with known value. @@ -44,7 +49,7 @@ type RateLimit struct { func init() { formatters.Register(processorType, func() formatters.EventProcessor { return &RateLimit{ - logger: log.New(io.Discard, "", 0), + logger: log.New(io.Discard, "", 0), } }) } @@ -82,7 +87,8 @@ func (o *RateLimit) Init(cfg interface{}, opts ...formatters.Option) error { } func (o *RateLimit) Apply(es ...*formatters.EventMsg) []*formatters.EventMsg { - validEs := []*formatters.EventMsg{} + validEs := make([]*formatters.EventMsg, len(es)) + acceptedCount := 0 for _, e := range es { if e == nil { continue @@ -90,16 +96,18 @@ func (o *RateLimit) Apply(es ...*formatters.EventMsg) []*formatters.EventMsg { h := hashEvent(e) ts, has := o.eventIndex.Get(h) // we check that we have the event hash in the map, if not, it's the first time we see the event - if val := float64(e.Timestamp - ts) * o.PerSecondLimit; has && e.Timestamp != ts && int64(val) < oneSecond { + if val := float64(e.Timestamp-ts) * o.PerSecondLimit; has && e.Timestamp != ts && int64(val) < oneSecond { // reject event o.logger.Printf("dropping event val %.2f lower than configured rate", val) continue } // retain the last event that passed through o.eventIndex.Add(h, e.Timestamp) - validEs = append(validEs, e) + validEs[acceptedCount] = e + acceptedCount++ } - return validEs + + return validEs[:acceptedCount] } func hashEvent(e *formatters.EventMsg) string { @@ -114,9 +122,9 @@ func hashEvent(e *formatters.EventMsg) string { for _, tagKey := range tagKeys { h.Write([]byte(tagKey)) - h.Write([]byte("=")) + h.Write(eqChar) h.Write([]byte(e.Tags[tagKey])) - h.Write([]byte("\n")) + h.Write(lfChar) } return hex.EncodeToString(h.Sum(nil)) From d62f24a5a2c0cc58aa26aab62ab44d9346a0f396 Mon Sep 17 00:00:00 2001 From: netixx Date: Wed, 25 Oct 2023 13:20:29 +0000 Subject: [PATCH 3/3] Revert to append for slice --- formatters/event_rate_limit/event_rate_limit.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/formatters/event_rate_limit/event_rate_limit.go b/formatters/event_rate_limit/event_rate_limit.go index 8441593d..454199a8 100644 --- a/formatters/event_rate_limit/event_rate_limit.go +++ b/formatters/event_rate_limit/event_rate_limit.go @@ -87,8 +87,8 @@ func (o *RateLimit) Init(cfg interface{}, opts ...formatters.Option) error { } func (o *RateLimit) Apply(es ...*formatters.EventMsg) []*formatters.EventMsg { - validEs := make([]*formatters.EventMsg, len(es)) - acceptedCount := 0 + validEs := make([]*formatters.EventMsg, 0, len(es)) + for _, e := range es { if e == nil { continue @@ -103,11 +103,10 @@ func (o *RateLimit) Apply(es ...*formatters.EventMsg) []*formatters.EventMsg { } // retain the last event that passed through o.eventIndex.Add(h, e.Timestamp) - validEs[acceptedCount] = e - acceptedCount++ + validEs = append(validEs, e) } - return validEs[:acceptedCount] + return validEs } func hashEvent(e *formatters.EventMsg) string {