diff --git a/bigcache.go b/bigcache.go index afe10add..9589b7bd 100644 --- a/bigcache.go +++ b/bigcache.go @@ -134,6 +134,15 @@ func (c *BigCache) Set(key string, entry []byte) error { return shard.set(key, hashedKey, entry) } +// Append appends entry under the key if key exists, otherwise +// it will set the key (same behaviour as Set()). With Append() you can +// concatenate multiple entries under the same key in an lock optimized way. +func (c *BigCache) Append(key string, entry []byte) error { + hashedKey := c.hash.Sum64(key) + shard := c.getShard(hashedKey) + return shard.append(key, hashedKey, entry) +} + // Delete removes the key func (c *BigCache) Delete(key string) error { hashedKey := c.hash.Sum64(key) diff --git a/bigcache_test.go b/bigcache_test.go index 48d5efca..cacb2db8 100644 --- a/bigcache_test.go +++ b/bigcache_test.go @@ -6,6 +6,7 @@ import ( "math" "math/rand" "runtime" + "strings" "sync" "testing" "time" @@ -27,6 +28,119 @@ func TestWriteAndGetOnCache(t *testing.T) { assertEqual(t, value, cachedValue) } +func TestAppendAndGetOnCache(t *testing.T) { + t.Parallel() + + // given + cache, _ := NewBigCache(DefaultConfig(5 * time.Second)) + key := "key" + value1 := make([]byte, 50) + rand.Read(value1) + value2 := make([]byte, 50) + rand.Read(value2) + value3 := make([]byte, 50) + rand.Read(value3) + + // when + _, err := cache.Get(key) + + // then + assertEqual(t, ErrEntryNotFound, err) + + // when + cache.Append(key, value1) + cachedValue, err := cache.Get(key) + + // then + noError(t, err) + assertEqual(t, value1, cachedValue) + + // when + cache.Append(key, value2) + cachedValue, err = cache.Get(key) + + // then + noError(t, err) + expectedValue := value1 + expectedValue = append(expectedValue, value2...) + assertEqual(t, expectedValue, cachedValue) + + // when + cache.Append(key, value3) + cachedValue, err = cache.Get(key) + + // then + noError(t, err) + expectedValue = value1 + expectedValue = append(expectedValue, value2...) + expectedValue = append(expectedValue, value3...) + assertEqual(t, expectedValue, cachedValue) +} + +// TestAppendRandomly does simultaneous appends to check for corruption errors. +func TestAppendRandomly(t *testing.T) { + t.Parallel() + + c := Config{ + Shards: 1, + LifeWindow: 5 * time.Second, + CleanWindow: 1 * time.Second, + MaxEntriesInWindow: 1000 * 10 * 60, + MaxEntrySize: 500, + StatsEnabled: true, + Verbose: true, + Hasher: newDefaultHasher(), + HardMaxCacheSize: 1, + Logger: DefaultLogger(), + } + cache, err := NewBigCache(c) + noError(t, err) + + nKeys := 5 + nAppendsPerKey := 2000 + nWorker := 10 + var keys []string + for i := 0; i < nKeys; i++ { + for j := 0; j < nAppendsPerKey; j++ { + keys = append(keys, fmt.Sprintf("key%d", i)) + } + } + rand.Shuffle(len(keys), func(i, j int) { + keys[i], keys[j] = keys[j], keys[i] + }) + + jobs := make(chan string, len(keys)) + for _, key := range keys { + jobs <- key + } + close(jobs) + + var wg sync.WaitGroup + for i := 0; i < nWorker; i++ { + wg.Add(1) + go func() { + for { + key, ok := <-jobs + if !ok { + break + } + cache.Append(key, []byte(key)) + } + wg.Done() + }() + } + wg.Wait() + + assertEqual(t, nKeys, cache.Len()) + for i := 0; i < nKeys; i++ { + key := fmt.Sprintf("key%d", i) + expectedValue := []byte(strings.Repeat(key, nAppendsPerKey)) + cachedValue, err := cache.Get(key) + noError(t, err) + assertEqual(t, expectedValue, cachedValue) + } +} + func TestConstructCacheWithDefaultHasher(t *testing.T) { t.Parallel() diff --git a/shard.go b/shard.go index 32220852..54eb65e4 100644 --- a/shard.go +++ b/shard.go @@ -84,6 +84,24 @@ func (s *cacheShard) get(key string, hashedKey uint64) ([]byte, error) { return entry, nil } +func (s *cacheShard) getWithoutLock(key string, hashedKey uint64) ([]byte, error) { + wrappedEntry, err := s.getWrappedEntry(hashedKey) + if err != nil { + return nil, err + } + if entryKey := readKeyFromEntry(wrappedEntry); key != entryKey { + s.collision() + if s.isVerbose { + s.logger.Printf("Collision detected. Both %q and %q have the same hash %x", key, entryKey, hashedKey) + } + return nil, ErrEntryNotFound + } + entry := readEntry(wrappedEntry) + s.hitWithoutLock(hashedKey) + + return entry, nil +} + func (s *cacheShard) getWrappedEntry(hashedKey uint64) ([]byte, error) { itemIndex := s.hashmap[hashedKey] @@ -131,6 +149,51 @@ func (s *cacheShard) set(key string, hashedKey uint64, entry []byte) error { } } +func (s *cacheShard) setWithoutLock(key string, hashedKey uint64, entry []byte) error { + currentTimestamp := uint64(s.clock.epoch()) + + if previousIndex := s.hashmap[hashedKey]; previousIndex != 0 { + if previousEntry, err := s.entries.Get(int(previousIndex)); err == nil { + resetKeyFromEntry(previousEntry) + } + } + + if oldestEntry, err := s.entries.Peek(); err == nil { + s.onEvict(oldestEntry, currentTimestamp, s.removeOldestEntry) + } + + w := wrapEntry(currentTimestamp, hashedKey, key, entry, &s.entryBuffer) + + for { + if index, err := s.entries.Push(w); err == nil { + s.hashmap[hashedKey] = uint32(index) + return nil + } + if s.removeOldestEntry(NoSpace) != nil { + return fmt.Errorf("entry is bigger than max shard size") + } + } +} + +func (s *cacheShard) append(key string, hashedKey uint64, entry []byte) error { + s.lock.Lock() + var newEntry []byte + oldEntry, err := s.getWithoutLock(key, hashedKey) + if err != nil { + if err != ErrEntryNotFound { + s.lock.Unlock() + return err + } + } else { + newEntry = oldEntry + } + + newEntry = append(newEntry, entry...) + err = s.setWithoutLock(key, hashedKey, newEntry) + s.lock.Unlock() + return err +} + func (s *cacheShard) del(hashedKey uint64) error { // Optimistic pre-check using only readlock s.lock.RLock() @@ -287,6 +350,13 @@ func (s *cacheShard) hit(key uint64) { } } +func (s *cacheShard) hitWithoutLock(key uint64) { + atomic.AddInt64(&s.stats.Hits, 1) + if s.statsEnabled { + s.hashmapStats[key]++ + } +} + func (s *cacheShard) miss() { atomic.AddInt64(&s.stats.Misses, 1) }