From 09ad427372a36519b3ef64c6678d755e749b51ef Mon Sep 17 00:00:00 2001 From: nesty92 Date: Sat, 11 Jan 2025 19:29:54 +0100 Subject: [PATCH] feat: implement Sliding Window Bloom Filter (#715) This feature enhances the existing Bloom Filter capabilities by allowing time-based item tracking, making it suitable for use cases requiring temporary membership checks. Signed-off-by: Ernesto Alejandro Santana Hidalgo --- rueidisprob/README.md | 59 +++ rueidisprob/slidingbloomfilter.go | 446 +++++++++++++++++ rueidisprob/slidingbloomfilter_test.go | 657 +++++++++++++++++++++++++ rueidisprob/synp.go | 20 + 4 files changed, 1182 insertions(+) create mode 100644 rueidisprob/slidingbloomfilter.go create mode 100644 rueidisprob/slidingbloomfilter_test.go create mode 100644 rueidisprob/synp.go diff --git a/rueidisprob/README.md b/rueidisprob/README.md index 2d529b5d..0dff8957 100644 --- a/rueidisprob/README.md +++ b/rueidisprob/README.md @@ -134,3 +134,62 @@ func main() { fmt.Println(count) // 1 } ``` + +### Sliding Window Bloom Filter + +It is a variation of the standard Bloom filter that adds a sliding window mechanism. +Useful for use cases where you need to keep track of items for a certain amount of time. + +Example: + +```go +package main + +import ( + "context" + "fmt" + "time" + + "github.com/redis/rueidis" + "github.com/redis/rueidis/rueidisprob" +) + +func main() { + client, err := rueidis.NewClient(rueidis.ClientOption{ + InitAddress: []string{"localhost:6379"}, + }) + if err != nil { + panic(err) + } + + sbf, err := NewSlidingBloomFilter(client, "sliding_bloom_filter", 1000, 0.01, time.Minute) + + err = sbf.Add(context.Background(), "hello") + if err != nil { + panic(err) + } + + err = sbf.Add(context.Background(), "world") + if err != nil { + panic(err) + } + + exists, err := sbf.Exists(context.Background(), "hello") + if err != nil { + panic(err) + } + fmt.Println(exists) // true + + exists, err = sbf.Exists(context.Background(), "world") + if err != nil { + panic(err) + } + fmt.Println(exists) // true + + count, err := sbf.Count(context.Background()) + if err != nil { + panic(err) + } + fmt.Println(count) // 2 +} +``` \ No newline at end of file diff --git a/rueidisprob/slidingbloomfilter.go b/rueidisprob/slidingbloomfilter.go new file mode 100644 index 00000000..fc3337a0 --- /dev/null +++ b/rueidisprob/slidingbloomfilter.go @@ -0,0 +1,446 @@ +package rueidisprob + +import ( + "context" + "errors" + "strconv" + "time" + + "github.com/redis/rueidis" +) + +const ( + slidingBloomFilterInitializeScript = ` +local filterKey = KEYS[1] +local nextFilterKey = KEYS[2] +local counterKey = KEYS[3] +local nextCounterKey = KEYS[4] +local lastRotationKey = KEYS[5] +local windowHalf = tonumber(ARGV[1]) + +if redis.call('EXISTS', filterKey, nextFilterKey, counterKey, nextCounterKey, lastRotationKey) == 0 then + local time = redis.call('TIME') + local current_time = tonumber(time[1]) * 1000 + math.floor(tonumber(time[2]) / 1000) + + redis.call('MSET', filterKey, "", counterKey, 0, nextFilterKey, "", nextCounterKey, 0) + redis.call('SET', lastRotationKey, tostring(current_time), 'PX', windowHalf, 'NX') +end + +return 1 +` + + slidingBloomFilterAddMultiScript = ` +local hashIterations = tonumber(ARGV[1]) +local windowHalf = tonumber(ARGV[2]) +local numElements = tonumber(#ARGV) - 2 + +local filterKey = KEYS[1] +local nextFilterKey = KEYS[2] +local counterKey = KEYS[3] +local nextCounterKey = KEYS[4] +local lastRotationKey = KEYS[5] + +local time = redis.call('TIME') +local current_time = tonumber(time[1]) * 1000 + math.floor(tonumber(time[2])/1000) +local acquiredLock = redis.call('SET', lastRotationKey, tostring(current_time), 'PX', windowHalf, 'NX') + +if acquiredLock then + redis.call('RENAME', nextFilterKey, filterKey) + redis.call('RENAME', nextCounterKey, counterKey) + redis.call('SET', nextFilterKey, "") + redis.call('SET', nextCounterKey, 0) +end + +local counter = 0 +local oneBits = 0 +for i=1, numElements do + local bitset = redis.call('BITFIELD', filterKey, 'SET', 'u1', ARGV[i+2], '1') + redis.call('BITFIELD', nextFilterKey, 'SET', 'u1', ARGV[i+2], '1') + + oneBits = oneBits + bitset[1] + if i % hashIterations == 0 then + if oneBits ~= hashIterations then + counter = counter + 1 + end + + oneBits = 0 + end +end + +redis.call('INCRBY', nextCounterKey, counter) +return redis.call('INCRBY', counterKey, counter) +` + slidingBloomFilterExistsMultiScript = ` +local hashIterations = tonumber(ARGV[1]) +local windowHalf = tonumber(ARGV[2]) +local numElements = tonumber(#ARGV) - 2 + +local filterKey = KEYS[1] +local nextFilterKey = KEYS[2] +local counterKey = KEYS[3] +local nextCounterKey = KEYS[4] +local lastRotationKey = KEYS[5] + +local time = redis.call('TIME') +local current_time = tonumber(time[1]) * 1000 + math.floor(tonumber(time[2])/1000) +local acquiredLock = redis.call('SET', lastRotationKey, tostring(current_time), 'PX', windowHalf, 'NX') + +if acquiredLock then + redis.call('RENAME', nextFilterKey, filterKey) + redis.call('RENAME', nextCounterKey, counterKey) + redis.call('SET', nextFilterKey, "") + redis.call('SET', nextCounterKey, 0) +end + +local result = {} +local oneBits = 0 +for i=1, numElements do + local index = tonumber(ARGV[i+2]) + local bitset = redis.call('BITFIELD', filterKey, 'GET', 'u1', index) + + oneBits = oneBits + bitset[1] + if i % hashIterations == 0 then + table.insert(result, oneBits == hashIterations) + + oneBits = 0 + end +end + +return result +` + + slidingBloomFilterExistsReadOnlyMultiScript = ` +local hashIterations = tonumber(ARGV[1]) +local windowHalf = tonumber(ARGV[2]) +local numElements = tonumber(#ARGV) - 2 + +local filterKey = KEYS[1] +local nextFilterKey = KEYS[2] +local counterKey = KEYS[3] +local nextCounterKey = KEYS[4] +local lastRotationKey = KEYS[5] + +local time = redis.call('TIME') +local current_time = tonumber(time[1]) * 1000 + math.floor(tonumber(time[2])/1000) +local acquiredLock = redis.call('SET', lastRotationKey, tostring(current_time), 'PX', windowHalf, 'NX') + +if acquiredLock then + redis.call('RENAME', nextFilterKey, filterKey) + redis.call('RENAME', nextCounterKey, counterKey) + redis.call('SET', nextFilterKey, "") + redis.call('SET', nextCounterKey, 0) +end + +local result = {} +local oneBits = 0 +for i=1, numElements do + local index = tonumber(ARGV[i+2]) + local bitset = redis.call('BITFIELD_RO', filterKey, 'GET', 'u1', index) + + oneBits = oneBits + bitset[1] + if i % hashIterations == 0 then + table.insert(result, oneBits == hashIterations) + + oneBits = 0 + end +end + +return result +` + + slidingBloomFilterResetScript = ` +local filterKey = KEYS[1] +local nextFilterKey = KEYS[2] +local counterKey = KEYS[3] +local nextCounterKey = KEYS[4] + +redis.call('RENAME', nextFilterKey, filterKey) +redis.call('RENAME', nextCounterKey, counterKey) +redis.call('SET', nextFilterKey, "") +redis.call('SET', nextCounterKey, 0) +` + + // Redis key suffixes + counterSuffix = ":c" + nextFilterSuffix = ":n" + nextCounterSuffix = ":nc" + lastRotationSuffix = ":lr" +) + +type Logger interface { + Error(msg string, args ...any) +} + +var ( + _ BloomFilter = (*slidingBloomFilter)(nil) + ErrWindowSizeLessThanOneSecond = errors.New("window size cannot be less than 1 second") +) + +type SlidingBloomFilterOptions struct { + enableReadOperation bool +} + +type SlidingBloomFilterOptionFunc func(o *SlidingBloomFilterOptions) + +func WithReadOnlyExists(enableReadOperations bool) SlidingBloomFilterOptionFunc { + return func(o *SlidingBloomFilterOptions) { + o.enableReadOperation = enableReadOperations + } +} + +type slidingBloomFilter struct { + client rueidis.Client + + // window is the duration of the sliding window. + window time.Duration + + // Pre-calculated window half in milliseconds + windowHalfMs string + + // name is the name of the sliding Bloom filter. + // It is used as a key in the Redis. + name string + + // counter is the name of the counter. + counter string + + // hashIterations is the number of hash functions to use. + hashIterations uint + hashIterationString string + + // size is the number of bits to use. + size uint + + addMultiScript *rueidis.Lua + addMultiKeys []string + + existsMultiScript *rueidis.Lua + existsMultiKeys []string +} + +// NewSlidingBloomFilter creates a new sliding window Bloom filter. +// NOTE: 'name:c' is used as a counter key in the Redis +// 'name:n' is used as a next filter key in the Redis +// 'name:nc' is used as a next counter key in the Redis +// 'name:lr' is used as a last rotation key in the Redis +// to keep track of the items in the window. +func NewSlidingBloomFilter( + redisClient rueidis.Client, + name string, + expectedNumberOfItems uint, + falsePositiveRate float64, + windowSize time.Duration, + opts ...SlidingBloomFilterOptionFunc, +) (BloomFilter, error) { + if len(name) == 0 { + return nil, ErrEmptyName + } + + if falsePositiveRate <= 0 { + return nil, ErrFalsePositiveRateLessThanEqualZero + } + if falsePositiveRate > 1 { + return nil, ErrFalsePositiveRateGreaterThanOne + } + if windowSize < time.Second { + return nil, ErrWindowSizeLessThanOneSecond + } + + size := numberOfBloomFilterBits(expectedNumberOfItems, falsePositiveRate) + if size == 0 { + return nil, ErrBitsSizeZero + } + if size > maxSize { + return nil, ErrBitsSizeTooLarge + } + hashIterations := numberOfBloomFilterHashFunctions(size, expectedNumberOfItems) + + options := &SlidingBloomFilterOptions{} + for _, opt := range opts { + opt(options) + } + + var existsMultiScript string + if options.enableReadOperation { + existsMultiScript = slidingBloomFilterExistsReadOnlyMultiScript + } else { + existsMultiScript = slidingBloomFilterExistsMultiScript + } + + // NOTE: https://redis.io/docs/reference/cluster-spec/#hash-tags + bfName := "{" + name + "}" + counterName := bfName + counterSuffix + nextFilterName := bfName + nextFilterSuffix + nextCounterName := bfName + nextCounterSuffix + lastRotationName := bfName + lastRotationSuffix + + s := &slidingBloomFilter{ + client: redisClient, + name: bfName, + counter: counterName, + window: windowSize, + windowHalfMs: strconv.FormatInt(windowSize.Milliseconds()/2, 10), + hashIterations: hashIterations, + hashIterationString: strconv.FormatUint(uint64(hashIterations), 10), + size: size, + addMultiScript: rueidis.NewLuaScript(slidingBloomFilterAddMultiScript), + addMultiKeys: []string{bfName, nextFilterName, counterName, nextCounterName, lastRotationName}, + existsMultiScript: rueidis.NewLuaScript(existsMultiScript), + existsMultiKeys: []string{bfName, nextFilterName, counterName, nextCounterName, lastRotationName}, + } + + err := s.initialize() + if err != nil { + return nil, err + } + + return s, nil +} + +func (s *slidingBloomFilter) initialize() error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + initializeScript := rueidis.NewLuaScript(slidingBloomFilterInitializeScript) + resp := initializeScript.Exec(ctx, s.client, s.addMultiKeys, []string{s.windowHalfMs}) + if resp.Error() != nil && !rueidis.IsRedisNil(resp.Error()) { + return resp.Error() + } + + v, err := resp.AsInt64() + if err != nil { + return err + } + + if v != 1 { + return errors.New("failed to initialize sliding Bloom filter") + } + + return nil +} + +func (s *slidingBloomFilter) Add(ctx context.Context, key string) error { + return s.AddMulti(ctx, []string{key}) +} + +func (s *slidingBloomFilter) AddMulti(ctx context.Context, keys []string) error { + if len(keys) == 0 { + return nil + } + + buf := bytesPool.Get(0, len(keys)*int(s.hashIterations)*8) + defer bytesPool.Put(buf) + + indexes := s.indexes(keys, &buf.s) + + args := make([]string, 0, len(indexes)+2) + args = append(args, s.hashIterationString) + args = append(args, s.windowHalfMs) + args = append(args, indexes...) + + resp := s.addMultiScript.Exec(ctx, s.client, s.addMultiKeys, args) + return resp.Error() +} + +func (s *slidingBloomFilter) indexes(keys []string, buf *[]byte) []string { + allIndexes := make([]string, 0, len(keys)*int(s.hashIterations)) + size := uint64(s.size) + + for _, key := range keys { + h1, h2 := hash([]byte(key)) + for i := uint(0); i < s.hashIterations; i++ { + offset := len(*buf) + *buf = strconv.AppendUint(*buf, index(h1, h2, i, size), 10) + allIndexes = append(allIndexes, rueidis.BinaryString((*buf)[offset:])) + } + } + return allIndexes +} + +func (s *slidingBloomFilter) Exists(ctx context.Context, key string) (bool, error) { + exists, err := s.ExistsMulti(ctx, []string{key}) + if err != nil { + return false, err + } + + return exists[0], nil +} + +func (s *slidingBloomFilter) ExistsMulti(ctx context.Context, keys []string) ([]bool, error) { + if len(keys) == 0 { + return nil, nil + } + + buf := bytesPool.Get(0, len(keys)*int(s.hashIterations)*8) + defer bytesPool.Put(buf) + + indexes := s.indexes(keys, &buf.s) + + args := make([]string, 0, len(indexes)+2) + args = append(args, s.hashIterationString) + args = append(args, s.windowHalfMs) + args = append(args, indexes...) + + resp := s.existsMultiScript.Exec(ctx, s.client, s.existsMultiKeys, args) + if resp.Error() != nil { + return nil, resp.Error() + } + + arr, err := resp.ToArray() + if err != nil { + return nil, err + } + + result := make([]bool, len(keys)) + for i, el := range arr { + v, err := el.AsBool() + if err != nil { + if rueidis.IsRedisNil(err) { + result[i] = false + continue + } + + return nil, err + } + + result[i] = v + } + return result, nil +} + +func (s *slidingBloomFilter) Reset(ctx context.Context) error { + resp := s.client.Do(ctx, + s.client.B(). + Eval(). + Script(slidingBloomFilterResetScript). + Numkeys(4). + Key(s.addMultiKeys...). + Build(), + ) + return resp.Error() +} + +func (s *slidingBloomFilter) Delete(ctx context.Context) error { + resp := s.client.Do(ctx, s.client.B().Del().Key(s.addMultiKeys...).Build()) + return resp.Error() +} + +func (s *slidingBloomFilter) Count(ctx context.Context) (uint64, error) { + resp := s.client.Do( + ctx, + s.client.B(). + Get(). + Key(s.counter). + Build(), + ) + count, err := resp.AsUint64() + if err != nil { + if rueidis.IsRedisNil(err) { + return 0, nil + } + + return 0, err + } + + return count, nil +} diff --git a/rueidisprob/slidingbloomfilter_test.go b/rueidisprob/slidingbloomfilter_test.go new file mode 100644 index 00000000..f7b0e4f0 --- /dev/null +++ b/rueidisprob/slidingbloomfilter_test.go @@ -0,0 +1,657 @@ +package rueidisprob + +import ( + "context" + "errors" + "fmt" + "math/rand" + "strconv" + "sync" + "testing" + "time" + + "github.com/redis/rueidis" +) + +func TestNewSlidingBloomFilter(t *testing.T) { + t.Run("default", func(t *testing.T) { + client, flushAllAndClose, err := setupRedis7Cluster() + if err != nil { + t.Error(err) + } + defer func() { + err := flushAllAndClose() + if err != nil { + t.Error(err) + } + }() + + bf, err := NewSlidingBloomFilter(client, "test", 100, 0.05, time.Minute) + if err != nil { + t.Error(err) + } + + if bf == nil { + t.Error("Bloom filter is nil") + } + sbf := bf.(*slidingBloomFilter) + if sbf.client == nil { + t.Error("Client is nil") + } + if sbf.name != "{test}" { + t.Error("Name is not {test}") + } + if sbf.hashIterations != 4 { + t.Error("Hash iterations is not 4") + } + if sbf.window != time.Minute { + t.Error("Window size is not 1 minute") + } + }) + + t.Run("with read operation enabled", func(t *testing.T) { + client, flushAllAndClose, err := setupRedis7Cluster() + if err != nil { + t.Error(err) + } + defer func() { + err := flushAllAndClose() + if err != nil { + t.Error(err) + } + }() + + bf, err := NewSlidingBloomFilter(client, "test", 100, 0.05, time.Minute, WithReadOnlyExists(true)) + if err != nil { + t.Error(err) + } + + if bf == nil { + t.Error("Bloom filter is nil") + } + }) +} + +func TestNewSlidingBloomFilterError(t *testing.T) { + t.Run("EmptyName", func(t *testing.T) { + client, flushAllAndClose, err := setupRedis7Cluster() + if err != nil { + t.Error(err) + } + defer func() { + err := flushAllAndClose() + if err != nil { + t.Error(err) + } + }() + + _, err = NewSlidingBloomFilter(client, "", 100, 0.05, time.Minute) + if !errors.Is(err, ErrEmptyName) { + t.Error("Error is not ErrEmptyName") + } + }) + + t.Run("NegativeFalsePositiveRate", func(t *testing.T) { + client, flushAllAndClose, err := setupRedis7Cluster() + if err != nil { + t.Error(err) + } + defer func() { + err := flushAllAndClose() + if err != nil { + t.Error(err) + } + }() + + _, err = NewSlidingBloomFilter(client, "test", 100, -0.01, time.Minute) + if !errors.Is(err, ErrFalsePositiveRateLessThanEqualZero) { + t.Error("Error is not ErrFalsePositiveRateLessThanEqualZero") + } + }) +} + +func TestSlidingBloomFilterAdd(t *testing.T) { + client, flushAllAndClose, err := setupRedis7Cluster() + if err != nil { + t.Error(err) + } + defer func() { + err := flushAllAndClose() + if err != nil { + t.Error(err) + } + }() + + bf, err := NewSlidingBloomFilter(client, "test", 100, 0.05, time.Minute) + if err != nil { + t.Error(err) + } + + err = bf.Add(context.Background(), "1") + if err != nil { + t.Error(err) + } + + exists, err := bf.Exists(context.Background(), "1") + if err != nil { + t.Error(err) + } + if !exists { + t.Error("Key test does not exist") + } +} + +func TestSlidingBloomFilterAddMulti(t *testing.T) { + t.Run("add multiple items", func(t *testing.T) { + client, flushAllAndClose, err := setupRedis7Cluster() + if err != nil { + t.Error(err) + } + defer func() { + err := flushAllAndClose() + if err != nil { + t.Error(err) + } + }() + + bf, err := NewSlidingBloomFilter(client, "test", 100, 0.05, time.Minute) + if err != nil { + t.Error(err) + } + + keys := []string{"1", "2", "3"} + err = bf.AddMulti(context.Background(), keys) + if err != nil { + t.Error(err) + } + + for _, key := range keys { + exists, err := bf.Exists(context.Background(), key) + if err != nil { + t.Error(err) + } + if !exists { + t.Errorf("Key %s does not exist", key) + } + } + }) + + t.Run("add empty items", func(t *testing.T) { + client, flushAllAndClose, err := setupRedis7Cluster() + if err != nil { + t.Error(err) + } + defer func() { + err := flushAllAndClose() + if err != nil { + t.Error(err) + } + }() + + bf, err := NewSlidingBloomFilter(client, "test", 100, 0.05, time.Minute) + if err != nil { + t.Error(err) + } + + err = bf.AddMulti(context.Background(), []string{}) + if err != nil { + t.Error(err) + } + }) +} + +func TestSlidingBloomFilterRotation(t *testing.T) { + t.Run("rotation after window", func(t *testing.T) { + client, flushAllAndClose, err := setupRedis7Cluster() + if err != nil { + t.Error(err) + } + defer func() { + err := flushAllAndClose() + if err != nil { + t.Error(err) + } + }() + windowDuration := time.Second + rotationInterval := time.Millisecond * 600 + + bf, err := NewSlidingBloomFilter(client, "test", 100, 0.05, windowDuration) + if err != nil { + t.Error(err) + } + + err = bf.Add(context.Background(), "1") + if err != nil { + t.Error(err) + } + + exists, err := bf.Exists(context.Background(), "1") + if err != nil { + t.Error(err) + } + if !exists { + t.Error("Key should exist before rotation") + } + + // Wait for rotation + time.Sleep(rotationInterval) + + // Check that item is still in the filter + exists, err = bf.Exists(context.Background(), "1") + if err != nil { + t.Error(err) + } + if !exists { + t.Error("Key should exist in the filter") + } + + // Wait for another rotation + time.Sleep(rotationInterval) + + // Item should be gone after second rotation + exists, err = bf.Exists(context.Background(), "1") + if err != nil { + t.Error(err) + } + if exists { + t.Error("Key should not exist after second rotation") + } + }) +} + +func TestSlidingBloomFilterDelete(t *testing.T) { + t.Run("delete exists", func(t *testing.T) { + client, flushAllAndClose, err := setupRedis7Cluster() + if err != nil { + t.Error(err) + } + defer func() { + err := flushAllAndClose() + if err != nil { + t.Error(err) + } + }() + + bf, err := NewSlidingBloomFilter(client, "test", 100, 0.05, time.Minute) + if err != nil { + t.Error(err) + } + + err = bf.Add(context.Background(), "1") + if err != nil { + t.Error(err) + } + + err = bf.Delete(context.Background()) + if err != nil { + t.Error(err) + } + + // Verify all keys are deleted + for _, key := range []string{"{test}", "{test}:n", "{test}:c", "{test}:nc", "{test}:lr"} { + resp := client.Do(context.Background(), client.B().Get().Key(key).Build()) + if !rueidis.IsRedisNil(resp.Error()) { + t.Errorf("Key %s still exists", key) + } + } + }) +} + +func BenchmarkSlidingBloomFilterAddMultiBigSize(b *testing.B) { + client, flushAllAndClose, err := setupRedis7Cluster() + if err != nil { + b.Error(err) + } + defer func() { + err := flushAllAndClose() + if err != nil { + b.Error(err) + } + }() + + bf, err := NewSlidingBloomFilter(client, "test", 100000000, 0.01, time.Minute) + if err != nil { + b.Error(err) + } + + keys := make([]string, 10) + for i := 0; i < 10; i++ { + keys[i] = strconv.Itoa(i) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := bf.AddMulti(context.Background(), keys) + if err != nil { + b.Error(err) + } + } +} + +func BenchmarkSlidingBloomFilterAddMultiLowRate(b *testing.B) { + client, flushAllAndClose, err := setupRedis7Cluster() + if err != nil { + b.Error(err) + } + defer func() { + err := flushAllAndClose() + if err != nil { + b.Error(err) + } + }() + + bf, err := NewSlidingBloomFilter(client, "test", 1000000, 0.0000000001, time.Minute) + if err != nil { + b.Error(err) + } + + keys := make([]string, 10) + for i := 0; i < 10; i++ { + keys[i] = strconv.Itoa(i) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := bf.AddMulti(context.Background(), keys) + if err != nil { + b.Error(err) + } + } +} + +func BenchmarkSlidingBloomFilterAddMultiManyKeys(b *testing.B) { + client, flushAllAndClose, err := setupRedis7Cluster() + if err != nil { + b.Error(err) + } + defer func() { + err := flushAllAndClose() + if err != nil { + b.Error(err) + } + }() + + bf, err := NewSlidingBloomFilter(client, "test", 1000000, 0.01, time.Minute) + if err != nil { + b.Error(err) + } + + keys := make([]string, 200) + for i := 0; i < 200; i++ { + keys[i] = strconv.Itoa(i) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + err := bf.AddMulti(context.Background(), keys) + if err != nil { + b.Error(err) + } + } +} + +func BenchmarkSlidingBloomFilterExistsMultiBigSize(b *testing.B) { + client, flushAllAndClose, err := setupRedis7Cluster() + if err != nil { + b.Error(err) + } + defer func() { + err := flushAllAndClose() + if err != nil { + b.Error(err) + } + }() + + bf, err := NewSlidingBloomFilter(client, "test", 100000000, 0.01, time.Minute) + if err != nil { + b.Error(err) + } + + keys := make([]string, 10) + for i := 0; i < 10; i++ { + keys[i] = strconv.Itoa(i) + } + err = bf.AddMulti(context.Background(), keys) + if err != nil { + b.Error(err) + } + + var benchKeys []string + for i := 0; i < 10; i++ { + key := strconv.Itoa(rand.Intn(b.N)) + benchKeys = append(benchKeys, key) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := bf.ExistsMulti(context.Background(), benchKeys) + if err != nil { + b.Error(err) + } + } +} + +func BenchmarkSlidingBloomFilterExistsMultiLowRate(b *testing.B) { + client, flushAllAndClose, err := setupRedis7Cluster() + if err != nil { + b.Error(err) + } + defer func() { + err := flushAllAndClose() + if err != nil { + b.Error(err) + } + }() + + bf, err := NewSlidingBloomFilter(client, "test", 1000000, 0.0000000001, time.Minute) + if err != nil { + b.Error(err) + } + + keys := make([]string, 10) + for i := 0; i < 10; i++ { + keys[i] = strconv.Itoa(i) + } + err = bf.AddMulti(context.Background(), keys) + if err != nil { + b.Error(err) + } + + var benchKeys []string + for i := 0; i < 10; i++ { + key := strconv.Itoa(rand.Intn(b.N)) + benchKeys = append(benchKeys, key) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := bf.ExistsMulti(context.Background(), benchKeys) + if err != nil { + b.Error(err) + } + } +} + +func BenchmarkSlidingBloomFilterExistsMultiManyKeys(b *testing.B) { + client, flushAllAndClose, err := setupRedis7Cluster() + if err != nil { + b.Error(err) + } + defer func() { + err := flushAllAndClose() + if err != nil { + b.Error(err) + } + }() + + bf, err := NewSlidingBloomFilter(client, "test", 1000000, 0.01, time.Minute) + if err != nil { + b.Error(err) + } + + keys := make([]string, 200) + for i := 0; i < 200; i++ { + keys[i] = strconv.Itoa(i) + } + err = bf.AddMulti(context.Background(), keys) + if err != nil { + b.Error(err) + } + + var benchKeys []string + for i := 0; i < 200; i++ { + key := strconv.Itoa(rand.Intn(b.N)) + benchKeys = append(benchKeys, key) + } + + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, err := bf.ExistsMulti(context.Background(), benchKeys) + if err != nil { + b.Error(err) + } + } +} + +func TestSlidingBloomFilterConcurrentRotation(t *testing.T) { + client, flushAllAndClose, err := setupRedis7Cluster() + if err != nil { + t.Error(err) + } + defer func() { + err := flushAllAndClose() + if err != nil { + t.Error(err) + } + }() + + windowDuration := time.Second + numClients := 10 + + // Create multiple bloom filter instances + filters := make([]BloomFilter, numClients) + for i := 0; i < numClients; i++ { + bf, err := NewSlidingBloomFilter(client, "test", 1000, 0.01, windowDuration) + if err != nil { + t.Fatal(err) + } + filters[i] = bf + } + startTime := time.Now() + + // Add some initial items that should stay during first half window + initialKeys := []string{"initial1", "initial2", "initial3"} + err = filters[0].AddMulti(context.Background(), initialKeys) + if err != nil { + t.Fatal(err) + } + + // Create context with timeout for the entire test + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Run concurrent operations + var wg sync.WaitGroup + errChan := make(chan error, numClients) + + // Run client operations + for i := 0; i < numClients; i++ { + wg.Add(1) + go func(clientID int) { + defer wg.Done() + bf := filters[clientID] + + for { + select { + case <-ctx.Done(): + return + default: + key := fmt.Sprintf("client%d-key%d", clientID, time.Now().UnixNano()) + + // Randomly choose between Add and Exists operations + if rand.Float32() < 0.5 { + err := bf.Add(ctx, key) + if err != nil && !errors.Is(err, context.Canceled) { + errChan <- fmt.Errorf("client %d add error: %w", clientID, err) + return + } + } else { + _, err := bf.Exists(ctx, key) + if err != nil && !errors.Is(err, context.Canceled) { + errChan <- fmt.Errorf("client %d exists error: %w", clientID, err) + return + } + } + + time.Sleep(time.Millisecond) + } + } + }(i) + } + + newKeys := []string{"new1", "new2", "new3"} + go func() { + time.Sleep(windowDuration / 2) + err = filters[0].AddMulti(ctx, newKeys) + if err != nil { + panic(err) + } + }() + + // Wait for initial keys to disappear and verify it took at least windowDuration + ticker := time.NewTicker(5 * time.Millisecond) + defer ticker.Stop() + + allGone := false + for !allGone { + select { + case <-ctx.Done(): + t.Fatal("context deadline exceeded before keys disappeared") + case <-ticker.C: + exists, err := filters[0].ExistsMulti(ctx, initialKeys) + if errors.Is(err, context.DeadlineExceeded) { + break + } + if err != nil { + t.Fatal(err) + } + + allGone = true + for _, exists := range exists { + if exists { + allGone = false + break + } + } + } + } + + // Verify new keys are still present + exists, err := filters[0].ExistsMulti(ctx, newKeys) + if err != nil { + t.Fatal(err) + } + for i, exists := range exists { + if !exists { + t.Errorf("new key %s not present", newKeys[i]) + } + } + + // Cancel context and wait for all operations to complete + cancel() + wg.Wait() + + // Check for any errors from goroutines + close(errChan) + for err := range errChan { + t.Error(err) + } + + t.Logf("Test ran for %v with %d clients", time.Since(startTime), numClients) +} diff --git a/rueidisprob/synp.go b/rueidisprob/synp.go new file mode 100644 index 00000000..6d353ecb --- /dev/null +++ b/rueidisprob/synp.go @@ -0,0 +1,20 @@ +package rueidisprob + +import "github.com/redis/rueidis/internal/util" + +var bytesPool = util.NewPool(func(capacity int) *bytesContainer { + return &bytesContainer{s: make([]byte, 0, capacity)} +}) + +type bytesContainer struct { + s []byte +} + +func (r *bytesContainer) Capacity() int { + return cap(r.s) +} + +func (r *bytesContainer) ResetLen(n int) { + clear(r.s) + r.s = r.s[:n] +}