Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sweep expired rate limiter buckets #290

Merged
merged 9 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"strings"
"sync"
"time"

middleware "github.com/grpc-ecosystem/go-grpc-middleware"
prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand Down Expand Up @@ -87,9 +88,15 @@ func (s *Server) startGRPC() error {
stream = append(stream, telemetryInterceptor.Stream())

if s.Config.Authn.Enable {
limiter := ratelimiter.NewTokenBucketRateLimiter(s.Log)
// Expire buckets after 1 hour of inactivity,
// sweep for expired buckets every 10 minutes.
// Note: entry expiration should be at least some multiple of
// maximum (limit max / limit rate) minutes.
go limiter.Janitor(s.ctx, 10*time.Minute, 1*time.Hour)
s.authorizer = NewWalletAuthorizer(&AuthnConfig{
AuthnOptions: s.Config.Authn,
Limiter: ratelimiter.NewTokenBucketRateLimiter(s.Log),
Limiter: limiter,
AllowLister: s.Config.AllowLister,
Log: s.Log.Named("authn"),
})
Expand Down
81 changes: 81 additions & 0 deletions pkg/ratelimiter/buckets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package ratelimiter

import (
"sync"
"time"

"go.uber.org/zap"
)

type Buckets struct {
log *zap.Logger
mutex sync.RWMutex
buckets map[string]*Entry
}

func NewBuckets(log *zap.Logger) *Buckets {
return &Buckets{
log: log,
buckets: make(map[string]*Entry),
mutex: sync.RWMutex{},
}
}

func (b *Buckets) getAndRefill(bucket string, limit *Limit, multiplier uint16, createIfMissing bool) *Entry {
// The locking strategy is adapted from the following blog post: https://misfra.me/optimizing-concurrent-map-access-in-go/
b.mutex.RLock()
currentVal, exists := b.buckets[bucket]
b.mutex.RUnlock()
if !exists {
if !createIfMissing {
return nil
}
b.mutex.Lock()
currentVal, exists = b.buckets[bucket]
mkobetic marked this conversation as resolved.
Show resolved Hide resolved
if !exists {
currentVal = &Entry{
tokens: uint16(limit.MaxTokens * multiplier),
lastSeen: time.Now(),
mutex: sync.Mutex{},
}
b.buckets[bucket] = currentVal
b.mutex.Unlock()

return currentVal
}
b.mutex.Unlock()
}

limit.Refill(currentVal, multiplier)
return currentVal
}

func (b *Buckets) deleteExpired(expiresAfter time.Duration) (deleted int) {
// Use RLock to iterate over the map
// to allow concurrent reads
b.mutex.RLock()
var expired []string
for bucket, entry := range b.buckets {
if time.Since(entry.lastSeen) > expiresAfter {
expired = append(expired, bucket)
}
}
b.mutex.RUnlock()
if len(expired) == 0 {
return deleted
}
b.log.Info("found expired buckets", zap.Int("count", len(expired)))
// Use Lock for individual deletes to avoid prolonged
// lockout for readers.
for _, bucket := range expired {
b.mutex.Lock()
// check lastSeen again in case it was updated in the meantime.
if entry, exists := b.buckets[bucket]; exists && time.Since(entry.lastSeen) > expiresAfter {
delete(b.buckets, bucket)
deleted++
}
b.mutex.Unlock()
}
b.log.Info("deleted expired buckets", zap.Int("count", deleted))
return deleted
}
55 changes: 35 additions & 20 deletions pkg/ratelimiter/rate_limiter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ratelimiter

import (
"context"
"errors"
"sync"
"time"
Expand Down Expand Up @@ -69,8 +70,9 @@ func (l Limit) Refill(entry *Entry, multiplier uint16) {
// TokenBucketRateLimiter implements the RateLimiter interface
type TokenBucketRateLimiter struct {
log *zap.Logger
buckets map[string]*Entry
mutex sync.RWMutex
newBuckets *Buckets // buckets that can be added to
oldBuckets *Buckets // buckets to be swept for expired entries
PriorityMultiplier uint16
Limits map[LimitType]*Limit
}
Expand All @@ -79,8 +81,8 @@ func NewTokenBucketRateLimiter(log *zap.Logger) *TokenBucketRateLimiter {
tb := new(TokenBucketRateLimiter)
tb.log = log.Named("ratelimiter")
// TODO: need to periodically clear out expired items to avoid unlimited growth of the map.
tb.buckets = make(map[string]*Entry)
tb.mutex = sync.RWMutex{}
tb.newBuckets = NewBuckets(log.Named("buckets1"))
tb.oldBuckets = NewBuckets(log.Named("buckets2"))
tb.PriorityMultiplier = PRIORITY_MULTIPLIER
tb.Limits = map[LimitType]*Limit{
DEFAULT: {DEFAULT_MAX_TOKENS, DEFAULT_RATE_PER_MINUTE},
Expand All @@ -103,25 +105,14 @@ func (rl *TokenBucketRateLimiter) fillAndReturnEntry(limitType LimitType, bucket
if isPriority {
multiplier = rl.PriorityMultiplier
}
// The locking strategy is adapted from the following blog post: https://misfra.me/optimizing-concurrent-map-access-in-go/
rl.mutex.RLock()
mkobetic marked this conversation as resolved.
Show resolved Hide resolved
currentVal, exists := rl.buckets[bucket]
rl.mutex.RUnlock()
if !exists {
rl.mutex.Lock()
currentVal = &Entry{
tokens: uint16(limit.MaxTokens * multiplier),
lastSeen: time.Now(),
mutex: sync.Mutex{},
}
rl.buckets[bucket] = currentVal
rl.mutex.Unlock()

return currentVal
if entry := rl.oldBuckets.getAndRefill(bucket, limit, multiplier, false); entry != nil {
rl.mutex.RUnlock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer rl.mutex.RUnlock() would work here, since it is always getting unlocked at the end

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would, but I was avoiding defer on these short paths as we do elsewhere.

return entry
}

limit.Refill(currentVal, multiplier)
return currentVal
entry := rl.newBuckets.getAndRefill(bucket, limit, multiplier, true)
rl.mutex.RUnlock()
return entry
}

// The Spend function takes a bucket and a boolean asserting whether to apply the PRIORITY or the REGULAR rate limits.
Expand All @@ -140,6 +131,30 @@ func (rl *TokenBucketRateLimiter) Spend(limitType LimitType, bucket string, cost
return nil
}

func (rl *TokenBucketRateLimiter) Janitor(ctx context.Context, sweepInterval, expiresAfter time.Duration) {
ticker := time.NewTicker(sweepInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
rl.sweepAndSwap(expiresAfter)
}
}
}

func (rl *TokenBucketRateLimiter) sweepAndSwap(expiresAfter time.Duration) (deletedEntries int) {
// Only the janitor writes to oldBuckets (see below), so we shouldn't need to rlock it here.
deletedEntries = rl.oldBuckets.deleteExpired(expiresAfter)
rl.mutex.Lock()
rl.newBuckets, rl.oldBuckets = rl.oldBuckets, rl.newBuckets
rl.mutex.Unlock()
rl.newBuckets.log.Info("became new buckets")
rl.oldBuckets.log.Info("became old buckets")
return deletedEntries
}

func minUint16(x, y uint16) uint16 {
if x <= y {
return x
Expand Down
113 changes: 80 additions & 33 deletions pkg/ratelimiter/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@ const walletAddress = "0x1234"
func TestSpend(t *testing.T) {
logger, _ := zap.NewDevelopment()
rl := NewTokenBucketRateLimiter(logger)
rl.buckets[walletAddress] = &Entry{
lastSeen: time.Now(),
tokens: uint16(1),
mutex: sync.Mutex{},
}
rl.newBuckets.getAndRefill(walletAddress, &Limit{1, 0}, 1, true)

err1 := rl.Spend(DEFAULT, walletAddress, 1, false)
require.NoError(t, err1)
Expand All @@ -42,12 +38,9 @@ func TestSpendWithTime(t *testing.T) {
logger, _ := zap.NewDevelopment()
rl := NewTokenBucketRateLimiter(logger)
rl.Limits[DEFAULT] = &Limit{100, 1}
rl.buckets[walletAddress] = &Entry{
// Set the last seen to 1 minute ago
lastSeen: time.Now().Add(-1 * time.Minute),
tokens: uint16(0),
mutex: sync.Mutex{},
}
entry := rl.newBuckets.getAndRefill(walletAddress, &Limit{0, 0}, 1, true)
// Set the last seen to 1 minute ago
entry.lastSeen = time.Now().Add(-1 * time.Minute)
err1 := rl.Spend(DEFAULT, walletAddress, 1, false)
require.NoError(t, err1)
err2 := rl.Spend(DEFAULT, walletAddress, 1, false)
Expand All @@ -58,41 +51,31 @@ func TestSpendWithTime(t *testing.T) {
func TestSpendMaxBucket(t *testing.T) {
logger, _ := zap.NewDevelopment()
rl := NewTokenBucketRateLimiter(logger)
rl.buckets[walletAddress] = &Entry{
// Set last seen to 500 minutes ago
lastSeen: time.Now().Add(-500 * time.Minute),
tokens: uint16(0),
mutex: sync.Mutex{},
}
entry := rl.fillAndReturnEntry(DEFAULT, walletAddress, false)
entry := rl.newBuckets.getAndRefill(walletAddress, &Limit{0, 0}, 1, true)
// Set last seen to 500 minutes ago
entry.lastSeen = time.Now().Add(-500 * time.Minute)
entry = rl.fillAndReturnEntry(DEFAULT, walletAddress, false)
require.Equal(t, entry.tokens, DEFAULT_MAX_TOKENS)
}

// Ensure that the allow list is being correctly applied
func TestSpendAllowListed(t *testing.T) {
logger, _ := zap.NewDevelopment()
rl := NewTokenBucketRateLimiter(logger)
rl.buckets[walletAddress] = &Entry{
// Set last seen to 5 minutes ago
lastSeen: time.Now().Add(-5 * time.Minute),
tokens: uint16(0),
mutex: sync.Mutex{},
}
entry := rl.fillAndReturnEntry(DEFAULT, walletAddress, true)
entry := rl.newBuckets.getAndRefill(walletAddress, &Limit{0, 0}, 1, true)
// Set last seen to 5 minutes ago
entry.lastSeen = time.Now().Add(-5 * time.Minute)
entry = rl.fillAndReturnEntry(DEFAULT, walletAddress, true)
require.Equal(t, entry.tokens, uint16(5*DEFAULT_RATE_PER_MINUTE*PRIORITY_MULTIPLIER))
}

func TestMaxUint16(t *testing.T) {
logger, _ := zap.NewDevelopment()
rl := NewTokenBucketRateLimiter(logger)
rl.buckets[walletAddress] = &Entry{
// Set last seen to 1 million minutes ago
lastSeen: time.Now().Add(-1000000 * time.Minute),
tokens: uint16(0),
mutex: sync.Mutex{},
}

entry := rl.fillAndReturnEntry(DEFAULT, walletAddress, true)
entry := rl.newBuckets.getAndRefill(walletAddress, &Limit{0, 0}, 1, true)
// Set last seen to 1 million minutes ago
entry.lastSeen = time.Now().Add(-1000000 * time.Minute)
entry = rl.fillAndReturnEntry(DEFAULT, walletAddress, true)
require.Equal(t, entry.tokens, DEFAULT_MAX_TOKENS*PRIORITY_MULTIPLIER)
}

Expand All @@ -114,3 +97,67 @@ func TestSpendConcurrent(t *testing.T) {
entry := rl.fillAndReturnEntry(PUBLISH, walletAddress, false)
require.Equal(t, entry.tokens, uint16(0))
}

func TestBucketExpiration(t *testing.T) {
// Set things up so that entries are expired after two sweep intervals
expiresAfter := 100 * time.Millisecond
sweepInterval := 60 * time.Millisecond

logger, _ := zap.NewDevelopment()
rl := NewTokenBucketRateLimiter(logger)
rl.Limits[DEFAULT] = &Limit{2, 0} // 2 tokens, no refill

require.NoError(t, rl.Spend(DEFAULT, "ip1", 1, false)) // bucket1 add
require.NoError(t, rl.Spend(DEFAULT, "ip2", 1, false)) // bucket1 add

time.Sleep(sweepInterval)
require.Equal(t, 0, rl.sweepAndSwap(expiresAfter)) // sweep bucket2 and swap

require.NoError(t, rl.Spend(DEFAULT, "ip2", 1, false)) // bucket1 refresh
require.NoError(t, rl.Spend(DEFAULT, "ip3", 1, false)) // bucket2 add

time.Sleep(sweepInterval)
require.Equal(t, 1, rl.sweepAndSwap(expiresAfter)) // sweep bucket1 and swap, delete ip1

// ip2 has been refreshed every 60ms so it should still be out of tokens
require.Error(t, rl.Spend(DEFAULT, "ip2", 1, false)) // bucket1 refresh
// ip1 entry should have expired by now, so we should have 2 tokens again
require.NoError(t, rl.Spend(DEFAULT, "ip1", 1, false)) // bucket1 add
require.NoError(t, rl.Spend(DEFAULT, "ip1", 1, false)) // bucket1 refresh
require.Error(t, rl.Spend(DEFAULT, "ip1", 1, false)) // bucket1 refresh

time.Sleep(sweepInterval)
require.Equal(t, 1, rl.sweepAndSwap(expiresAfter)) // sweep bucket2 and swap, delete ip3

// ip2 should still be out of tokens
require.Error(t, rl.Spend(DEFAULT, "ip2", 1, false)) // bucket1 refresh
// ip3 should have expired now and we should have 2 tokens again
require.NoError(t, rl.Spend(DEFAULT, "ip3", 1, false)) // bucket2 add
require.NoError(t, rl.Spend(DEFAULT, "ip3", 1, false)) // bucket2 refresh
require.Error(t, rl.Spend(DEFAULT, "ip3", 1, false)) // bucket2 refresh
}

func TestBucketExpirationIntegrity(t *testing.T) {
mkobetic marked this conversation as resolved.
Show resolved Hide resolved
expiresAfter := 10 * time.Millisecond
logger, _ := zap.NewDevelopment()
rl := NewTokenBucketRateLimiter(logger)
rl.Limits[DEFAULT] = &Limit{2, 0} // 2 tokens, no refill

require.NoError(t, rl.Spend(DEFAULT, "ip1", 1, false)) // bucket1 add

require.Equal(t, 0, rl.sweepAndSwap(expiresAfter)) // sweep bucket2 and swap

require.NoError(t, rl.Spend(DEFAULT, "ip1", 1, false)) // bucket1 refresh
require.Error(t, rl.Spend(DEFAULT, "ip1", 1, false)) // should be out of tokens now

require.Equal(t, 0, rl.sweepAndSwap(expiresAfter)) // sweep bucket1 and swap

require.Error(t, rl.Spend(DEFAULT, "ip1", 1, false)) // should still be out of tokens

require.Equal(t, 0, rl.sweepAndSwap(expiresAfter)) // sweep bucket2 and swap

time.Sleep(2 * expiresAfter) // wait until ip1 expires
require.Equal(t, 1, rl.sweepAndSwap(expiresAfter)) // sweep bucket1 and swap, delete ip1

require.NoError(t, rl.Spend(DEFAULT, "ip1", 1, false)) // bucket1 add
}