Skip to content

Commit

Permalink
Merge pull request #262 from netixx/rate-limiter-processor
Browse files Browse the repository at this point in the history
Add rate-limit processor
  • Loading branch information
karimra authored Oct 25, 2023
2 parents 1c3d90f + d62f24a commit 0436882
Show file tree
Hide file tree
Showing 8 changed files with 542 additions and 3 deletions.
28 changes: 28 additions & 0 deletions docs/user_guide/event_processors/event_rate_limit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
The `event-rate-limit` processor rate-limits each event with matching tags to the configured amount per-seconds.

All the tags for each event is hashed, and if the hash matches a previously seen event, then the timestamp
of the event itself is compared to assess if the configured limit has been exceeded.
If it has, then this new event is dropped from the pipeline.

The cache for comparing timestamp is an LRU cache, with a default size of 1000 that can be increased for bigger deployments.

To account for cases where the device will artificially split the event into multiple chunks (with the same timestamp),
the rate-limiter will ignore events with exactly the same timestamp.


### Examples

```yaml
processors:
# processor name
rate-limit-100pps:
# processor type
event-rate-limit:
# rate of filtering, in events per seconds
per-second: 100
# set the cache size for doing the rate-limiting comparison
# default value is 1000
cache-size: 10000
# debug for additionnal logging of dropped events
debug: true
```
1 change: 1 addition & 0 deletions formatters/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
_ "github.com/openconfig/gnmic/formatters/event_jq"
_ "github.com/openconfig/gnmic/formatters/event_merge"
_ "github.com/openconfig/gnmic/formatters/event_override_ts"
_ "github.com/openconfig/gnmic/formatters/event_rate_limit"
_ "github.com/openconfig/gnmic/formatters/event_starlark"
_ "github.com/openconfig/gnmic/formatters/event_strings"
_ "github.com/openconfig/gnmic/formatters/event_to_tag"
Expand Down
142 changes: 142 additions & 0 deletions formatters/event_rate_limit/event_rate_limit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package event_rate_limit

import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log"
"os"
"sort"
"time"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/openconfig/gnmic/formatters"
"github.com/openconfig/gnmic/types"
"github.com/openconfig/gnmic/utils"
)

const (
processorType = "event-rate-limit"
loggingPrefix = "[" + processorType + "] "
defaultCacheSize = 1000
oneSecond int64 = int64(time.Second)
)

var (
eqChar = []byte("=")
lfChar = []byte("\n")
)

// RateLimit rate-limits the message to the given rate.
type RateLimit struct {
// formatters.EventProcessor

PerSecondLimit float64 `mapstructure:"per-second,omitempty" json:"per-second,omitempty"`
CacheSize int `mapstructure:"cache-size,omitempty" json:"cache-size,omitempty"`
Debug bool `mapstructure:"debug,omitempty" json:"debug,omitempty"`

// eventIndex is an lru cache used to compare the events hash with known value.
// LRU cache seems like a good choice because we expect the rate-limiter to be
// most useful in burst scenarios.
// We need some form of control over the size of the cache to contain RAM usage
// so LRU is good in that respect also.
eventIndex *lru.Cache[string, int64]
logger *log.Logger
}

func init() {
formatters.Register(processorType, func() formatters.EventProcessor {
return &RateLimit{
logger: log.New(io.Discard, "", 0),
}
})
}

func (o *RateLimit) Init(cfg interface{}, opts ...formatters.Option) error {
err := formatters.DecodeConfig(cfg, o)
if err != nil {
return err
}
for _, opt := range opts {
opt(o)
}
if o.CacheSize <= 0 {
o.logger.Printf("using default value for lru size %d", defaultCacheSize)
o.CacheSize = defaultCacheSize

}
if o.PerSecondLimit <= 0 {
return fmt.Errorf("provided limit is %f, must be greater than 0", o.PerSecondLimit)
}
if o.logger.Writer() != io.Discard {
b, err := json.Marshal(o)
if err != nil {
o.logger.Printf("initialized processor '%s': %+v", processorType, o)
return nil
}
o.logger.Printf("initialized processor '%s': %s", processorType, string(b))
}

o.eventIndex, err = lru.New[string, int64](o.CacheSize)
if err != nil {
return fmt.Errorf("failed to initialize cache: %w", err)
}
return nil
}

func (o *RateLimit) Apply(es ...*formatters.EventMsg) []*formatters.EventMsg {
validEs := make([]*formatters.EventMsg, 0, len(es))

for _, e := range es {
if e == nil {
continue
}
h := hashEvent(e)
ts, has := o.eventIndex.Get(h)
// we check that we have the event hash in the map, if not, it's the first time we see the event
if val := float64(e.Timestamp-ts) * o.PerSecondLimit; has && e.Timestamp != ts && int64(val) < oneSecond {
// reject event
o.logger.Printf("dropping event val %.2f lower than configured rate", val)
continue
}
// retain the last event that passed through
o.eventIndex.Add(h, e.Timestamp)
validEs = append(validEs, e)
}

return validEs
}

func hashEvent(e *formatters.EventMsg) string {
h := sha256.New()
tagKeys := make([]string, len(e.Tags))
i := 0
for tagKey := range e.Tags {
tagKeys[i] = tagKey
i++
}
sort.Strings(tagKeys)

for _, tagKey := range tagKeys {
h.Write([]byte(tagKey))
h.Write(eqChar)
h.Write([]byte(e.Tags[tagKey]))
h.Write(lfChar)
}

return hex.EncodeToString(h.Sum(nil))
}

func (o *RateLimit) WithLogger(l *log.Logger) {
if o.Debug && l != nil {
o.logger = log.New(l.Writer(), loggingPrefix, l.Flags())
} else if o.Debug {
o.logger = log.New(os.Stderr, loggingPrefix, utils.DefaultLoggingFlags)
}
}

func (o *RateLimit) WithTargets(tcs map[string]*types.TargetConfig) {}

func (o *RateLimit) WithActions(act map[string]map[string]interface{}) {}
Loading

0 comments on commit 0436882

Please sign in to comment.